diff --git a/.accepted_words.txt b/.accepted_words.txt index 9f28b62c..a8cbb2d4 100644 --- a/.accepted_words.txt +++ b/.accepted_words.txt @@ -98,6 +98,7 @@ MappingClient MappingClientImpl md microsoft +min MockDigitalTwin MockMappingService MQTTProviderProxyImpl diff --git a/Cargo.toml b/Cargo.toml index 0f047dbd..f89d7dce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,8 +11,10 @@ members = [ "cloud_connectors/azure/mqtt_connector", "cloud_connectors/azure/proto-build", "common", + "contracts", "digital_twin_adapters/ibeji_adapter", "digital_twin_adapters/in_memory_mock_digital_twin_adapter", + "digital_twin_adapters/mock_digital_twin_adapter", "freyja", "mapping_clients/in_memory_mock_mapping_client", "mapping_clients/mock_mapping_service_client", @@ -48,6 +50,7 @@ env_logger = "0.10.0" futures = "0.3.28" httptest = "0.15.4" log = "^0.4" +mockall = "0.11.4" paho-mqtt = "0.12" proc-macro2 = "1.0.52" prost = "0.11.9" diff --git a/cloud_adapters/azure_cloud_connector_adapter/src/azure_cloud_connector_adapter.rs b/cloud_adapters/azure_cloud_connector_adapter/src/azure_cloud_connector_adapter.rs index 38475786..29112e1b 100644 --- a/cloud_adapters/azure_cloud_connector_adapter/src/azure_cloud_connector_adapter.rs +++ b/cloud_adapters/azure_cloud_connector_adapter/src/azure_cloud_connector_adapter.rs @@ -80,7 +80,7 @@ impl AzureCloudConnectorAdapter { #[async_trait] impl CloudAdapter for AzureCloudConnectorAdapter { /// Creates a new instance of a CloudAdapter with default settings - fn create_new() -> Result, CloudAdapterError> { + fn create_new() -> Result { let cloud_connector_client = futures::executor::block_on(async { let config_file = fs::read_to_string(Path::new(env!("OUT_DIR")).join(CONFIG_FILE)) .map_err(CloudAdapterError::io)?; @@ -100,9 +100,9 @@ impl CloudAdapter for AzureCloudConnectorAdapter { .map_err(CloudAdapterError::communication) })?; - Ok(Box::new(Self { + Ok(Self { cloud_connector_client, - })) + }) } /// Sends the signal to the cloud diff --git a/cloud_adapters/in_memory_mock_cloud_adapter/Cargo.toml b/cloud_adapters/in_memory_mock_cloud_adapter/Cargo.toml index 0dc6a50e..b6a45d2b 100644 --- a/cloud_adapters/in_memory_mock_cloud_adapter/Cargo.toml +++ b/cloud_adapters/in_memory_mock_cloud_adapter/Cargo.toml @@ -13,6 +13,6 @@ async-trait = { workspace = true } freyja-contracts = { workspace = true } log = { workspace = true } serde = { workspace = true } -serde_json = { workspace = true} -tokio = { workspace = true} +serde_json = { workspace = true } +tokio = { workspace = true } time = { workspace = true } \ No newline at end of file diff --git a/cloud_adapters/in_memory_mock_cloud_adapter/src/in_memory_mock_cloud_adapter.rs b/cloud_adapters/in_memory_mock_cloud_adapter/src/in_memory_mock_cloud_adapter.rs index ae9dcb1c..32240747 100644 --- a/cloud_adapters/in_memory_mock_cloud_adapter/src/in_memory_mock_cloud_adapter.rs +++ b/cloud_adapters/in_memory_mock_cloud_adapter/src/in_memory_mock_cloud_adapter.rs @@ -47,9 +47,8 @@ impl InMemoryMockCloudAdapter { #[async_trait] impl CloudAdapter for InMemoryMockCloudAdapter { /// Creates a new instance of a CloudAdapter with default settings - fn create_new() -> Result, CloudAdapterError> { + fn create_new() -> Result { Self::from_config_file(Path::new(env!("OUT_DIR")).join(CONFIG_FILE)) - .map(|r| Box::new(r) as _) } /// Sends the signal to the cloud diff --git a/common/Cargo.toml b/common/Cargo.toml index 9ff0c1c7..3e43d47e 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -9,5 +9,6 @@ edition = "2021" license = "MIT" [dependencies] +freyja-contracts = { workspace = true } log = { workspace = true } tokio = { workspace = true } \ No newline at end of file diff --git a/common/src/lib.rs b/common/src/lib.rs index c58217e3..46e349b3 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -2,4 +2,5 @@ // Licensed under the MIT license. // SPDX-License-Identifier: MIT +pub mod signal_store; pub mod utils; diff --git a/common/src/signal_store.rs b/common/src/signal_store.rs new file mode 100644 index 00000000..7df31777 --- /dev/null +++ b/common/src/signal_store.rs @@ -0,0 +1,669 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +use std::{collections::HashMap, sync::RwLock}; + +use freyja_contracts::signal::{Emission, Signal, SignalPatch}; + +/// Stores signals and allows access in a thread-safe manner with support for multiple concurrent readers. +/// Suitable for use as `Arc`. +pub struct SignalStore { + /// The data being stored + signals: RwLock>, +} + +impl SignalStore { + /// Creates an empty SignalStore + pub fn new() -> Self { + Self { + signals: RwLock::new(HashMap::new()), + } + } + + /// Get a value from the store. Returns `None` if the signal was not found. + /// Acquires a read lock. + /// + /// # Arguments + /// - `id`: The id of the entity to retrieve + pub fn get(&self, id: &String) -> Option { + let signals = self.signals.read().unwrap(); + signals.get(id).cloned() + } + + /// Gets a `Vec` containing copies all of the signals in the store. + /// Acquires a read lock. + pub fn get_all(&self) -> Vec { + let signals = self.signals.read().unwrap(); + signals.iter().map(|(_, signal)| signal.clone()).collect() + } + + /// For each signal in the input: + /// - If the incoming signal is already in the data store, apply the patch. + /// - If the incoming signal is not in the data store, create a new signal from the patch. + /// + /// For each signal in the data store: + /// - If the stored signal is not in the input, delete it + /// + /// The previous state of the store is discarded. + /// Acquires a write lock. + /// + /// # Arguments + /// - `incoming_signals`: The list of input signals + pub fn sync(&self, incoming_signals: SyncIterator) + where + SyncIterator: Iterator, + IntoSignalPatch: Into, + { + let mut signals = self.signals.write().unwrap(); + + // This algorithm avoids trying to iterate over incoming_signals multiple times since iterators are consumed in this process. + // If the iterator were cloneable then the implementation could be better, but in general that's not always a feasible constraint. + // This function isn't invoked very often (only when we have a new mapping), so less-than-optimal efficiency is less of a concern. + let size_hint = incoming_signals.size_hint(); + let mut incoming_ids = Vec::with_capacity(size_hint.1.unwrap_or(size_hint.0)); + for value in incoming_signals { + let SignalPatch { + id, + source, + target, + emission_policy, + } = value.into(); + + // We'll use these ids later to only retain entries in the store which were in the incoming list. + // We track it separately from the input iterator since we can't reuse the iterator. + incoming_ids.push(id.clone()); + + signals + .entry(id.clone()) + // If the incoming signal is already in the data store, update only its target and emission policy + .and_modify(|s| { + s.source = source.clone(); + s.target = target.clone(); + s.emission.policy = emission_policy.clone(); + }) + // If the incoming signal is not in the data store, insert a new one + .or_insert(Signal { + id, + source, + target, + emission: Emission { + policy: emission_policy, + ..Default::default() + }, + ..Default::default() + }); + } + + // Delete signals in the store but not in the incoming list + signals.retain(|id, _| incoming_ids.contains(id)); + } + + /// Sets the value of the signal with the given id to the requested value. + /// Returns the old value, or `None` if the signal could not be found. + /// Acquires a write lock. + /// + /// # Arguments + /// - `id`: The id of the signal to edit + /// - `value`: The new value to assign to the signal + pub fn set_value(&self, id: String, value: String) -> Option> { + let mut signals = self.signals.write().unwrap(); + + let mut result = None; + signals.entry(id).and_modify(|s| { + result = Some(s.value.clone()); + s.value = Some(value); + }); + + result + } + + /// Sets the last emitted value of the signal with the given id to the requested value + /// and resets its `next_emssion_ms` based on the emission policy. + /// Returns the old value, or `None` if the signal could not be found. + /// Acquires a write lock. + /// + /// # Arguments + /// - `id`: The id of the signal to edit + /// - `value`: The new value to assign to the signal's last emitted value + pub fn set_last_emitted_value(&self, id: String, value: String) -> Option> { + let mut signals = self.signals.write().unwrap(); + + let mut result = None; + signals.entry(id).and_modify(|s| { + result = Some(s.emission.last_emitted_value.clone()); + s.emission.last_emitted_value = Some(value); + s.emission.next_emission_ms = s.emission.policy.interval_ms; + }); + + result + } + + /// Adjusts the emission times of all signals in the store by subtracting the provided interval from next_emission_ms. + /// If overflow would occur, the value saturates at `u64::MIN` (`0`). + /// Returns the updated list of all signals. + /// Acquires a write lock. + /// + /// # Arguments + /// - `interval_ms`: The value to subtract from each signal's next_emission_ms value + pub fn update_emission_times_and_get_all(&self, interval_ms: u64) -> Vec { + let mut signals = self.signals.write().unwrap(); + let mut result = Vec::new(); + + for (_, signal) in signals.iter_mut() { + signal.emission.next_emission_ms = + signal.emission.next_emission_ms.saturating_sub(interval_ms); + result.push(signal.clone()); + } + + result + } +} + +impl Default for SignalStore { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod signal_store_tests { + use std::collections::HashSet; + + use freyja_contracts::{ + conversion::Conversion, + entity::Entity, + provider_proxy::OperationKind, + signal::{Emission, EmissionPolicy, Target}, + }; + + use super::*; + + #[test] + fn get_returns_existing_signal() { + const ID: &str = "testid"; + + let uut = SignalStore::new(); + { + let mut signals = uut.signals.write().unwrap(); + let signal = Signal { + id: ID.to_string(), + ..Default::default() + }; + + signals.insert(ID.to_string(), signal); + } + + let result = uut.get(&ID.to_string()); + assert!(result.is_some()); + assert_eq!(result.unwrap().id.as_str(), ID); + } + + #[test] + fn get_returns_none_for_non_existent_signal() { + const ID: &str = "testid"; + + let uut = SignalStore::new(); + { + let mut signals = uut.signals.write().unwrap(); + let signal = Signal { + id: ID.to_string(), + ..Default::default() + }; + + signals.insert(ID.to_string(), signal); + } + + let result = uut.get(&String::from("invalid")); + assert!(result.is_none()); + } + + #[test] + fn get_all_returns_all_signals() { + let mut ids = HashSet::new(); + for id in &["1", "2", "3"] { + ids.insert(id.to_string()); + } + + let uut = SignalStore::new(); + { + let mut signals = uut.signals.write().unwrap(); + + for id in ids.iter() { + let signal = Signal { + id: id.clone(), + ..Default::default() + }; + + signals.insert(id.clone(), signal); + } + } + + let result = uut.get_all(); + + assert!(result.len() == ids.len()); + + // HashSet equality checks that both operands have the same contents + assert_eq!( + result + .into_iter() + .map(|s| s.id) + .collect::>(), + ids + ); + } + + #[test] + fn sync_updates_correct_properties() { + const ID: &str = "id"; + const ORIGINAL: &str = "original"; + const INCOMING: &str = "incoming"; + + let original_signal = Signal { + id: ID.to_string(), + value: Some(ORIGINAL.to_string()), + source: Entity { + id: ID.to_string(), + name: Some(ORIGINAL.to_string()), + uri: ORIGINAL.to_string(), + description: Some(ORIGINAL.to_string()), + operation: OperationKind::Get, + protocol: ORIGINAL.to_string(), + }, + target: Target { + metadata: [(ORIGINAL.to_string(), ORIGINAL.to_string())] + .into_iter() + .collect(), + }, + emission: Emission { + policy: EmissionPolicy { + interval_ms: 42, + emit_only_if_changed: false, + conversion: Conversion::None, + }, + next_emission_ms: 42, + last_emitted_value: Some(ORIGINAL.to_string()), + }, + }; + + // Note that everything in this signal is different compared to original_signal + // (except the id) + let incoming_signal = Signal { + id: ID.to_string(), + value: Some(INCOMING.to_string()), + source: Entity { + id: ID.to_string(), + name: Some(INCOMING.to_string()), + uri: INCOMING.to_string(), + description: Some(INCOMING.to_string()), + operation: OperationKind::Subscribe, + protocol: INCOMING.to_string(), + }, + target: Target { + metadata: [(INCOMING.to_string(), INCOMING.to_string())] + .into_iter() + .collect(), + }, + emission: Emission { + policy: EmissionPolicy { + interval_ms: 123, + emit_only_if_changed: true, + conversion: Conversion::Linear { + mul: 1.2, + offset: 3.4, + }, + }, + next_emission_ms: 123, + last_emitted_value: Some(INCOMING.to_string()), + }, + }; + + let uut = SignalStore::new(); + { + let mut signals = uut.signals.write().unwrap(); + signals.insert(ID.to_string(), original_signal.clone()); + } + + uut.sync([incoming_signal.clone()].into_iter()); + let updated_signal = uut.get(&ID.to_string()).expect("Test signal should exist"); + + // The following fields should have changed to match the incoming signal: + // - source.* + // - target.* + // - emission.policy.* + assert_eq!(updated_signal.source, incoming_signal.source); + assert_eq!(updated_signal.target, incoming_signal.target); + assert_eq!( + updated_signal.emission.policy, + incoming_signal.emission.policy + ); + + // The following fields should NOT have changed to match the incoming signal: + // - value + // - emission.next_emission_ms + // - emission.last_emitted_value + assert_eq!(updated_signal.value, original_signal.value); + assert_eq!( + updated_signal.emission.next_emission_ms, + original_signal.emission.next_emission_ms + ); + assert_eq!( + updated_signal.emission.last_emitted_value, + original_signal.emission.last_emitted_value + ); + } + + #[test] + fn sync_inserts_new_signal() { + const ID: &str = "id"; + const INCOMING: &str = "incoming"; + + let incoming_signal = Signal { + id: ID.to_string(), + value: Some(INCOMING.to_string()), + source: Entity { + id: ID.to_string(), + name: Some(INCOMING.to_string()), + uri: INCOMING.to_string(), + description: Some(INCOMING.to_string()), + operation: OperationKind::Subscribe, + protocol: INCOMING.to_string(), + }, + target: Target { + metadata: [(INCOMING.to_string(), INCOMING.to_string())] + .into_iter() + .collect(), + }, + emission: Emission { + policy: EmissionPolicy { + interval_ms: 123, + emit_only_if_changed: true, + conversion: Conversion::Linear { + mul: 1.2, + offset: 3.4, + }, + }, + next_emission_ms: 123, + last_emitted_value: Some(INCOMING.to_string()), + }, + }; + + let uut = SignalStore::new(); + + uut.sync([incoming_signal.clone()].into_iter()); + let updated_signal = uut.get(&ID.to_string()).expect("Test signal should exist"); + + // The following fields should match the incoming signal: + // - source.* + // - target.* + // - emission.policy.* + assert_eq!(updated_signal.source, incoming_signal.source); + assert_eq!(updated_signal.target, incoming_signal.target); + assert_eq!( + updated_signal.emission.policy, + incoming_signal.emission.policy + ); + + // The following fields should be initialized to default: + // - value + // - emission.next_emission_ms + // - emission.last_emitted_value + assert_eq!(updated_signal.value, Default::default()); + assert_eq!(updated_signal.emission.next_emission_ms, Default::default()); + assert_eq!( + updated_signal.emission.last_emitted_value, + Default::default() + ); + } + + #[test] + fn sync_deletes_signals_not_in_input() { + const ID: &str = "id"; + const ORIGINAL: &str = "original"; + + let original_signal = Signal { + id: ID.to_string(), + value: Some(ORIGINAL.to_string()), + source: Entity { + id: ID.to_string(), + name: Some(ORIGINAL.to_string()), + uri: ORIGINAL.to_string(), + description: Some(ORIGINAL.to_string()), + operation: OperationKind::Get, + protocol: ORIGINAL.to_string(), + }, + target: Target { + metadata: [(ORIGINAL.to_string(), ORIGINAL.to_string())] + .into_iter() + .collect(), + }, + emission: Emission { + policy: EmissionPolicy { + interval_ms: 42, + emit_only_if_changed: false, + conversion: Conversion::None, + }, + next_emission_ms: 42, + last_emitted_value: Some(ORIGINAL.to_string()), + }, + }; + + let uut = SignalStore::new(); + { + let mut signals = uut.signals.write().unwrap(); + signals.insert(ID.to_string(), original_signal.clone()); + } + + uut.sync(Vec::::new().into_iter()); + let maybe_updated_signal = uut.get(&ID.to_string()); + assert!(maybe_updated_signal.is_none()); + } + + #[test] + fn set_value_tests() { + const ID: &str = "testid"; + + let uut = SignalStore::new(); + { + let mut signals = uut.signals.write().unwrap(); + let signal = Signal { + id: ID.to_string(), + ..Default::default() + }; + + signals.insert(ID.to_string(), signal); + } + + // Test first set returns Some(None) and changes state + let value = String::from("value"); + let result = uut.set_value(ID.to_string(), value.clone()); + assert!(result.is_some()); + assert!(result.unwrap().is_none()); + { + let signals = uut.signals.read().unwrap(); + assert_eq!( + signals.get(&ID.to_string()).unwrap().value, + Some(value.clone()) + ); + } + + // Test setting non-existent value returns None doesn't change state + let result = uut.set_value(String::from("foo"), String::from("foo")); + assert!(result.is_none()); + { + let signals = uut.signals.read().unwrap(); + assert_eq!( + signals.get(&ID.to_string()).unwrap().value, + Some(value.clone()) + ); + } + + // Test second set returns Some(Some("value")) and changes state + let result = uut.set_value(ID.to_string(), String::from("new value")); + assert!(result.is_some()); + assert!(result.as_ref().unwrap().is_some()); + assert_eq!(result.unwrap().unwrap(), value); + { + let signals = uut.signals.read().unwrap(); + assert_ne!( + signals.get(&ID.to_string()).unwrap().value, + Some(value.clone()) + ); + } + } + + #[test] + fn set_last_emitted_value_tests() { + const ID: &str = "testid"; + const INTERVAL: u64 = 42; + const UPDATED_EMISSION_TIME: u64 = 20; + + let uut = SignalStore::new(); + { + let mut signals = uut.signals.write().unwrap(); + let signal = Signal { + id: ID.to_string(), + emission: Emission { + policy: EmissionPolicy { + interval_ms: INTERVAL, + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }; + + signals.insert(ID.to_string(), signal); + } + + // Test first set returns Some(None) and changes state + let value = String::from("value"); + let result = uut.set_last_emitted_value(ID.to_string(), value.clone()); + assert!(result.is_some()); + assert!(result.unwrap().is_none()); + { + let signals = uut.signals.read().unwrap(); + let signal = signals.get(&ID.to_string()).unwrap(); + assert_eq!(signal.emission.last_emitted_value, Some(value.clone())); + assert_eq!(signal.emission.next_emission_ms, INTERVAL); + } + + { + // Simulate something changing next_emission_ms, such as the emitter + let mut signals = uut.signals.write().unwrap(); + signals + .entry(ID.to_string()) + .and_modify(|s| s.emission.next_emission_ms = UPDATED_EMISSION_TIME); + } + + // Test setting non-existent value returns None doesn't change state + let result = uut.set_last_emitted_value(String::from("foo"), String::from("foo")); + assert!(result.is_none()); + { + let signals = uut.signals.read().unwrap(); + let signal = signals.get(&ID.to_string()).unwrap(); + assert_eq!(signal.emission.last_emitted_value, Some(value.clone())); + assert_eq!(signal.emission.next_emission_ms, UPDATED_EMISSION_TIME); + } + + { + // Simulate something changing next_emission_ms, such as the emitter + let mut signals = uut.signals.write().unwrap(); + signals + .entry(ID.to_string()) + .and_modify(|s| s.emission.next_emission_ms = UPDATED_EMISSION_TIME); + } + + // Test second set returns Some(Some("value")) and changes state + let result = uut.set_last_emitted_value(ID.to_string(), String::from("new value")); + assert!(result.is_some()); + assert!(result.as_ref().unwrap().is_some()); + assert_eq!(result.unwrap().unwrap(), value); + { + let signals = uut.signals.read().unwrap(); + let signal = signals.get(&ID.to_string()).unwrap(); + assert_ne!(signal.emission.last_emitted_value, Some(value.clone())); + assert_eq!(signal.emission.next_emission_ms, INTERVAL); + } + } + + #[test] + fn update_emission_times_and_get_all_sets_correct_value() { + const ID: &str = "testid"; + const ORIGINAL_VALUE: u64 = 42; + const INTERVAL: u64 = 20; + + let uut = SignalStore::new(); + { + let mut signals = uut.signals.write().unwrap(); + let signal = Signal { + id: ID.to_string(), + emission: Emission { + next_emission_ms: ORIGINAL_VALUE, + ..Default::default() + }, + ..Default::default() + }; + + signals.insert(ID.to_string(), signal); + } + + let mut result = uut.update_emission_times_and_get_all(INTERVAL); + + // Validate the values in the result + assert_eq!(result.len(), 1); + let signal = result.pop().unwrap(); + assert_eq!(signal.id, ID.to_string()); + assert_eq!(signal.emission.next_emission_ms, ORIGINAL_VALUE - INTERVAL); + + // Validate the values in the store itself + { + let signals = uut.signals.read().unwrap(); + assert_eq!(signals.len(), 1); + assert!(signals.contains_key(&ID.to_string())); + let signal = signals.get(&ID.to_string()).unwrap(); + assert_eq!(signal.id, ID.to_string()); + assert_eq!(signal.emission.next_emission_ms, ORIGINAL_VALUE - INTERVAL); + } + } + + #[test] + fn update_emission_times_and_get_all_saturates_overflowed_value() { + const ID: &str = "testid"; + const ORIGINAL_VALUE: u64 = 20; + const INTERVAL: u64 = u64::MAX; + + let uut = SignalStore::new(); + { + let mut signals = uut.signals.write().unwrap(); + let signal = Signal { + id: ID.to_string(), + emission: Emission { + next_emission_ms: ORIGINAL_VALUE, + ..Default::default() + }, + ..Default::default() + }; + + signals.insert(ID.to_string(), signal); + } + + let mut result = uut.update_emission_times_and_get_all(INTERVAL); + + // Validate the values in the result + assert_eq!(result.len(), 1); + let signal = result.pop().unwrap(); + assert_eq!(signal.id, ID.to_string()); + assert_eq!(signal.emission.next_emission_ms, 0); + + // Validate the values in the store itself + { + let signals = uut.signals.read().unwrap(); + assert_eq!(signals.len(), 1); + assert!(signals.contains_key(&ID.to_string())); + let signal = signals.get(&ID.to_string()).unwrap(); + assert_eq!(signal.id, ID.to_string()); + assert_eq!(signal.emission.next_emission_ms, 0); + } + } +} diff --git a/contracts/Cargo.toml b/contracts/Cargo.toml index adcb746c..e70536d6 100644 --- a/contracts/Cargo.toml +++ b/contracts/Cargo.toml @@ -17,7 +17,4 @@ serde = { workspace = true } serde_json = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } -tokio = { workspace = true } - -[dev-dependencies] -rstest = { workspace = true } \ No newline at end of file +tokio = { workspace = true } \ No newline at end of file diff --git a/contracts/src/cloud_adapter.rs b/contracts/src/cloud_adapter.rs index ed668878..e51dee57 100644 --- a/contracts/src/cloud_adapter.rs +++ b/contracts/src/cloud_adapter.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; #[async_trait] pub trait CloudAdapter { /// Creates a new instance of a CloudAdapter with default settings - fn create_new() -> Result, CloudAdapterError> + fn create_new() -> Result where Self: Sized; diff --git a/contracts/src/conversion.rs b/contracts/src/conversion.rs index 1caeeaad..dc21edfc 100644 --- a/contracts/src/conversion.rs +++ b/contracts/src/conversion.rs @@ -69,6 +69,12 @@ impl Conversion { } } +impl Default for Conversion { + fn default() -> Self { + Self::None + } +} + #[cfg(test)] mod conversion_tests { use super::*; diff --git a/contracts/src/digital_twin_adapter.rs b/contracts/src/digital_twin_adapter.rs index 6aaacd0f..0f33c76d 100644 --- a/contracts/src/digital_twin_adapter.rs +++ b/contracts/src/digital_twin_adapter.rs @@ -2,27 +2,16 @@ // Licensed under the MIT license. // SPDX-License-Identifier: MIT -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, - time::Duration, -}; - use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use crate::{ - entity::{Entity, EntityID}, - provider_proxy_request::{ - ProviderProxySelectorRequestKind, ProviderProxySelectorRequestSender, - }, -}; +use crate::entity::Entity; /// Provides digital twin data #[async_trait] pub trait DigitalTwinAdapter { /// Creates a new instance of a DigitalTwinAdapter with default settings - fn create_new() -> Result, DigitalTwinAdapterError> + fn create_new() -> Result where Self: Sized; @@ -34,79 +23,6 @@ pub trait DigitalTwinAdapter { &self, request: GetDigitalTwinProviderRequest, ) -> Result; - - /// Run as a client to the in-vehicle digital twin provider - /// - /// # Arguments - /// - `entity_map`: shared map of entity ID to entity information - /// - `sleep_interval`: the interval in milliseconds between finding the access info of entities - /// - `provider_proxy_selector_request_sender`: sends requests to the provider proxy selector - async fn run( - &self, - entity_map: Arc>>>, - sleep_interval: Duration, - provider_proxy_selector_request_sender: Arc, - ) -> Result<(), DigitalTwinAdapterError>; - - /// Updates a shared entity map to populate empty values with provider information fetched from the digital twin service. - /// This default implementation is shared for all providers. - /// - /// # Arguments - /// - `entity_map`: the map to update - /// - `provider_proxy_selector_request_sender`: sends requests to the provider proxy selector - async fn update_entity_map( - &self, - entity_map: Arc>>>, - provider_proxy_selector_request_sender: Arc, - ) -> Result<(), DigitalTwinAdapterError> - where - Self: Sized, - { - // Copy the shared map - let mut updated_entities = { - let map = entity_map.lock().unwrap(); - map.clone() - }; - - // Update any entries which don't have an entity yet - for (entity_id, entity) in updated_entities.iter_mut().filter(|(_, e)| e.is_none()) { - let request = GetDigitalTwinProviderRequest { - entity_id: entity_id.clone(), - }; - - match self.find_by_id(request).await { - Ok(response) => { - *entity = Some(response.entity.clone()); - - // Notify the provider proxy selector to start a proxy - let Entity { - id, - uri, - operation, - protocol, - .. - } = response.entity; - let request = ProviderProxySelectorRequestKind::CreateOrUpdateProviderProxy( - id, uri, protocol, operation, - ); - - provider_proxy_selector_request_sender - .send_request_to_provider_proxy_selector(request); - } - Err(err) => { - log::error!("{err}"); - } - }; - } - - // Update the shared map - { - let mut map = entity_map.lock().unwrap(); - *map = updated_entities; - } - - Ok(()) - } } /// A request for digital twin providers @@ -154,207 +70,3 @@ proc_macros::error! { Unknown } } - -#[cfg(test)] -mod digital_twin_adapter_tests { - use super::*; - - use crate::provider_proxy::OperationKind; - - use rstest::*; - use tokio::{ - sync::mpsc::{self}, - task::JoinHandle, - }; - - struct TestDigitalTwinAdapter { - entity: Entity, - } - - #[async_trait] - impl DigitalTwinAdapter for TestDigitalTwinAdapter { - fn create_new() -> Result, DigitalTwinAdapterError> - { - Err(DigitalTwinAdapterError::unknown("not implemented")) - } - - async fn find_by_id( - &self, - request: GetDigitalTwinProviderRequest, - ) -> Result { - if self.entity.id == request.entity_id { - Ok(GetDigitalTwinProviderResponse { - entity: self.entity.clone(), - }) - } else { - Err(DigitalTwinAdapterError::entity_not_found("not found")) - } - } - - async fn run( - &self, - _entity_map: Arc>>>, - _sleep_interval: Duration, - _provider_proxy_selector_request_sender: Arc, - ) -> Result<(), DigitalTwinAdapterError> { - Err(DigitalTwinAdapterError::unknown("not implemented")) - } - } - - struct TestFixture { - adapter: TestDigitalTwinAdapter, - entity_id: String, - entity: Entity, - map: Arc>>>, - sender: Arc, - listener_handler: JoinHandle>, - } - - #[fixture] - fn fixture() -> TestFixture { - let entity = Entity { - id: "entity_id".to_string(), - name: Some("name".to_string()), - uri: "uri".to_string(), - description: Some("description".to_string()), - operation: OperationKind::Get, - protocol: "protocol".to_string(), - }; - - let (sender, mut receiver) = mpsc::unbounded_channel::(); - - let listener_handler = tokio::spawn(async move { receiver.recv().await }); - - TestFixture { - adapter: TestDigitalTwinAdapter { - entity: entity.clone(), - }, - entity_id: entity.id.clone(), - entity, - map: Arc::new(Mutex::new(HashMap::new())), - sender: Arc::new(ProviderProxySelectorRequestSender::new(sender)), - listener_handler, - } - } - - fn assert_entry_is_in_map( - entry: (String, Option), - map: Arc>>>, - ) { - let (id, entity) = entry; - let map = map.lock().unwrap(); - let value = map.get(&id); - assert!(value.is_some()); - - match entity { - Some(entity) => { - assert!(value.unwrap().is_some()); - let retrieved_entity = value.unwrap().as_ref().unwrap(); - assert_eq!(entity, *retrieved_entity); - } - None => { - assert!(value.unwrap().is_none()); - } - } - } - - // Variation of assert_entry_is_in_map for conveneince - fn assert_entity_is_in_map(entity: Entity, map: Arc>>>) { - assert_entry_is_in_map((entity.id.clone(), Some(entity)), map) - } - - #[rstest] - #[tokio::test] - async fn update_entity_map_updates_none_value(fixture: TestFixture) { - // Setup - { - let mut map = fixture.map.lock().unwrap(); - map.insert(fixture.entity_id.clone(), None); - } - - // Test - let update_result = fixture - .adapter - .update_entity_map(fixture.map.clone(), fixture.sender) - .await; - let join_result = fixture.listener_handler.await; - - // Verify - assert!(update_result.is_ok()); - assert!(join_result.is_ok()); - - assert_entity_is_in_map(fixture.entity.clone(), fixture.map.clone()); - - let proxy_request = join_result.unwrap(); - assert!(proxy_request.is_some()); - let proxy_request = proxy_request.as_ref().unwrap(); - match proxy_request { - ProviderProxySelectorRequestKind::CreateOrUpdateProviderProxy( - entity_id, - uri, - protocol, - operation, - ) => { - assert_eq!(*entity_id, fixture.entity_id); - assert_eq!(*uri, fixture.entity.uri); - assert_eq!(*protocol, fixture.entity.protocol); - assert_eq!(*operation, fixture.entity.operation); - } - _ => panic!("Unexpected proxy request kind: {proxy_request:?}"), - } - } - - #[rstest] - #[tokio::test] - async fn update_entity_map_skips_existing_values(fixture: TestFixture) { - // Setup - { - let mut map = fixture.map.lock().unwrap(); - map.insert(fixture.entity_id, Some(fixture.entity.clone())); - } - - // Test - let update_result = fixture - .adapter - .update_entity_map(fixture.map.clone(), fixture.sender) - .await; - let join_result = fixture.listener_handler.await; - - // Verify - assert!(update_result.is_ok()); - assert!(join_result.is_ok()); - - assert_entity_is_in_map(fixture.entity, fixture.map.clone()); - - let proxy_request = join_result.unwrap(); - assert!(proxy_request.is_none()); - } - - #[rstest] - #[tokio::test] - async fn update_entity_map_handles_entity_not_found(fixture: TestFixture) { - // Setup - let non_existent_id = String::from("fooid"); - - { - let mut map = fixture.map.lock().unwrap(); - map.insert(non_existent_id.clone(), None); - } - - // Test - let update_result = fixture - .adapter - .update_entity_map(fixture.map.clone(), fixture.sender) - .await; - let join_result = fixture.listener_handler.await; - - // Verify - assert!(update_result.is_ok()); - assert!(join_result.is_ok()); - - assert_entry_is_in_map((non_existent_id, None), fixture.map.clone()); - - let proxy_request = join_result.unwrap(); - assert!(proxy_request.is_none()); - } -} diff --git a/contracts/src/entity.rs b/contracts/src/entity.rs index 5d68235b..28c36912 100644 --- a/contracts/src/entity.rs +++ b/contracts/src/entity.rs @@ -6,9 +6,6 @@ use serde::{Deserialize, Serialize}; use crate::provider_proxy::OperationKind; -pub type EntityID = String; -pub type ProviderURI = String; - /// Represents an entity #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct Entity { @@ -30,3 +27,16 @@ pub struct Entity { /// The protocol to use to contact this entity pub protocol: String, } + +impl Default for Entity { + fn default() -> Self { + Self { + id: Default::default(), + name: Default::default(), + uri: Default::default(), + description: Default::default(), + operation: OperationKind::Get, + protocol: Default::default(), + } + } +} diff --git a/contracts/src/lib.rs b/contracts/src/lib.rs index 72093c5a..b9eafe95 100644 --- a/contracts/src/lib.rs +++ b/contracts/src/lib.rs @@ -10,3 +10,4 @@ pub mod entity; pub mod mapping_client; pub mod provider_proxy; pub mod provider_proxy_request; +pub mod signal; diff --git a/contracts/src/mapping_client.rs b/contracts/src/mapping_client.rs index 17a7b1ff..b12eda1d 100644 --- a/contracts/src/mapping_client.rs +++ b/contracts/src/mapping_client.rs @@ -13,7 +13,7 @@ use crate::digital_twin_map_entry::DigitalTwinMapEntry; #[async_trait] pub trait MappingClient { /// Creates a new instance of a MappingClient with default settings - fn create_new() -> Result, MappingClientError> + fn create_new() -> Result where Self: Sized; @@ -74,7 +74,7 @@ pub struct SendInventoryResponse {} #[derive(Debug, Serialize, Deserialize)] pub struct GetMappingRequest {} -/// A responsewith a mapping +/// A response with a mapping #[derive(Debug, Serialize, Deserialize)] pub struct GetMappingResponse { /// The map diff --git a/contracts/src/provider_proxy_request.rs b/contracts/src/provider_proxy_request.rs index c965e52a..ab2dafd7 100644 --- a/contracts/src/provider_proxy_request.rs +++ b/contracts/src/provider_proxy_request.rs @@ -4,23 +4,32 @@ use tokio::sync::mpsc::UnboundedSender; -use crate::{ - entity::{EntityID, ProviderURI}, - provider_proxy::OperationKind, -}; +use crate::provider_proxy::OperationKind; pub type Protocol = String; /// Represents a provider proxy selector request kind #[derive(Debug)] pub enum ProviderProxySelectorRequestKind { /// Creates or updates a provider's proxy - CreateOrUpdateProviderProxy(EntityID, ProviderURI, Protocol, OperationKind), + CreateOrUpdateProviderProxy { + /// The id of the entity to link to the proxy + entity_id: String, + /// The uri of the provider to proxy + uri: String, + /// The communication protocol of the provider to proxy + protocol: Protocol, + /// The operation of the provider to proxy + operation: OperationKind, + }, - /// Get an entity's - GetEntityValue(EntityID), + /// Get an entity's value + GetEntityValue { entity_id: String }, } +/// A client for sending requests to the `ProviderProxySelector` +#[derive(Clone)] pub struct ProviderProxySelectorRequestSender { + /// The communication channel for the `ProviderProxySelector` tx_provider_proxy_selector_request: UnboundedSender, } @@ -46,9 +55,15 @@ impl ProviderProxySelectorRequestSender { pub fn send_request_to_provider_proxy_selector( &self, request: ProviderProxySelectorRequestKind, - ) { + ) -> Result<(), ProviderProxySelectorRequestSenderError> { self.tx_provider_proxy_selector_request .send(request) - .expect("rx_provider_proxy_selector_request is dropped."); + .map_err(ProviderProxySelectorRequestSenderError::receiver_dropped) + } +} + +proc_macros::error! { + ProviderProxySelectorRequestSenderError { + ReceiverDropped, } } diff --git a/contracts/src/signal.rs b/contracts/src/signal.rs new file mode 100644 index 00000000..2421aeef --- /dev/null +++ b/contracts/src/signal.rs @@ -0,0 +1,76 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +use std::collections::HashMap; + +use crate::{conversion::Conversion, entity::Entity}; + +/// Conveys information about a signal, its current state, and how the data should be emitted +#[derive(Clone, Debug, Default, PartialEq)] +pub struct Signal { + /// The signal id. In most cases, this should be the same as the source id + pub id: String, + /// The signal's current value, if it's been set + pub value: Option, + /// The signal's source entity information + pub source: Entity, + /// The signal's target mapping information + pub target: Target, + /// The signal's emission metadata + pub emission: Emission, +} + +/// A partial signal representation used in the signal store's sync API +#[derive(Clone, Debug, Default, PartialEq)] +pub struct SignalPatch { + /// The signal id. In most cases, this should be the same as the source id + pub id: String, + /// The signal's source entity information + pub source: Entity, + /// The signal's target mapping information + pub target: Target, + /// The signal's emission metadata + pub emission_policy: EmissionPolicy, +} + +/// A signal's target mapping information +#[derive(Clone, Debug, Default, PartialEq)] +pub struct Target { + /// Metadata that will be passed to the cloud adapter to perform the mapping + pub metadata: HashMap, +} + +/// Metadata about a signal's emission +#[derive(Clone, Debug, Default, PartialEq)] +pub struct Emission { + /// The emission policy + pub policy: EmissionPolicy, + /// The time until the signal's next emission + /// Note that the default for this value is 0, which the emitter will interpret as ready to emit ASAP + pub next_emission_ms: u64, + /// The last emitted value + pub last_emitted_value: Option, +} + +/// A signal's emission policy +#[derive(Clone, Debug, Default, PartialEq)] +pub struct EmissionPolicy { + /// The interal at which the signal should be emitted + pub interval_ms: u64, + /// Indicates whether the signal data should only be emitted if the value has changed + pub emit_only_if_changed: bool, + /// A conversion to apply to the signal before emission + pub conversion: Conversion, +} + +impl From for SignalPatch { + fn from(value: Signal) -> Self { + Self { + id: value.id, + source: value.source, + target: value.target, + emission_policy: value.emission.policy, + } + } +} diff --git a/digital_twin_adapters/ibeji_adapter/src/ibeji_adapter.rs b/digital_twin_adapters/ibeji_adapter/src/ibeji_adapter.rs index 6be43428..8e63e4c6 100644 --- a/digital_twin_adapters/ibeji_adapter/src/ibeji_adapter.rs +++ b/digital_twin_adapters/ibeji_adapter/src/ibeji_adapter.rs @@ -2,14 +2,7 @@ // Licensed under the MIT license. // SPDX-License-Identifier: MIT -use std::{ - collections::HashMap, - fs, - path::Path, - str::FromStr, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{fs, path::Path, str::FromStr, time::Duration}; use async_trait::async_trait; use core_protobuf_data_access::invehicle_digital_twin::v1::{ @@ -27,9 +20,8 @@ use freyja_contracts::{ DigitalTwinAdapter, DigitalTwinAdapterError, GetDigitalTwinProviderRequest, GetDigitalTwinProviderResponse, }, - entity::{Entity, EntityID}, + entity::Entity, provider_proxy::OperationKind, - provider_proxy_request::ProviderProxySelectorRequestSender, }; const GET_OPERATION: &str = "Get"; @@ -80,7 +72,7 @@ impl IbejiAdapter { #[async_trait] impl DigitalTwinAdapter for IbejiAdapter { /// Creates a new instance of a DigitalTwinAdapter with default settings - fn create_new() -> Result, DigitalTwinAdapterError> { + fn create_new() -> Result { let settings_content = fs::read_to_string(Path::new(env!("OUT_DIR")).join(CONFIG_FILE)).unwrap(); let settings: Settings = serde_json::from_str(settings_content.as_str()).unwrap(); @@ -134,7 +126,7 @@ impl DigitalTwinAdapter for IbejiAdapter { }) .unwrap(); - Ok(Box::new(Self { client })) + Ok(Self { client }) } /// Gets entity access information @@ -149,6 +141,7 @@ impl DigitalTwinAdapter for IbejiAdapter { let request = tonic::Request::new(FindByIdRequest { id: entity_id.clone(), }); + let response = self .client .clone() @@ -204,29 +197,8 @@ impl DigitalTwinAdapter for IbejiAdapter { uri: endpoint.uri, protocol: endpoint.protocol, }; - Ok(GetDigitalTwinProviderResponse { entity }) - } - /// Run as a client to the in-vehicle digital twin provider - /// - /// # Arguments - /// - `entity_map`: shared map of entity ID to entity information - /// - `sleep_interval`: the interval in milliseconds between finding the access info of entities - /// - `provider_proxy_selector_request_sender`: sends requests to the provider proxy selector - async fn run( - &self, - entity_map: Arc>>>, - sleep_interval: Duration, - provider_proxy_selector_request_sender: Arc, - ) -> Result<(), DigitalTwinAdapterError> { - loop { - self.update_entity_map( - entity_map.clone(), - provider_proxy_selector_request_sender.clone(), - ) - .await?; - tokio::time::sleep(sleep_interval).await; - } + Ok(GetDigitalTwinProviderResponse { entity }) } } diff --git a/digital_twin_adapters/in_memory_mock_digital_twin_adapter/src/in_memory_mock_digital_twin_adapter.rs b/digital_twin_adapters/in_memory_mock_digital_twin_adapter/src/in_memory_mock_digital_twin_adapter.rs index 8c6ffa4b..5a5ca24d 100644 --- a/digital_twin_adapters/in_memory_mock_digital_twin_adapter/src/in_memory_mock_digital_twin_adapter.rs +++ b/digital_twin_adapters/in_memory_mock_digital_twin_adapter/src/in_memory_mock_digital_twin_adapter.rs @@ -2,18 +2,14 @@ // Licensed under the MIT license. // SPDX-License-Identifier: MIT -use std::{collections::HashMap, fs, path::Path, sync::Arc, sync::Mutex, time::Duration}; +use std::{fs, path::Path}; use async_trait::async_trait; use crate::config::EntityConfig; -use freyja_contracts::{ - digital_twin_adapter::{ - DigitalTwinAdapter, DigitalTwinAdapterError, DigitalTwinAdapterErrorKind, - GetDigitalTwinProviderRequest, GetDigitalTwinProviderResponse, - }, - entity::{Entity, EntityID}, - provider_proxy_request::ProviderProxySelectorRequestSender, +use freyja_contracts::digital_twin_adapter::{ + DigitalTwinAdapter, DigitalTwinAdapterError, DigitalTwinAdapterErrorKind, + GetDigitalTwinProviderRequest, GetDigitalTwinProviderResponse, }; const CONFIG_FILE: &str = "config.json"; @@ -54,9 +50,8 @@ impl InMemoryMockDigitalTwinAdapter { #[async_trait] impl DigitalTwinAdapter for InMemoryMockDigitalTwinAdapter { /// Creates a new instance of a DigitalTwinAdapter with default settings - fn create_new() -> Result, DigitalTwinAdapterError> { + fn create_new() -> Result { Self::from_config_file(Path::new(env!("OUT_DIR")).join(CONFIG_FILE)) - .map(|r| Box::new(r) as _) } /// Gets the entity information based on the request @@ -75,35 +70,13 @@ impl DigitalTwinAdapter for InMemoryMockDigitalTwinAdapter { }) .ok_or(DigitalTwinAdapterErrorKind::EntityNotFound.into()) } - - /// Run as a client to the mock in-vehicle digital twin provider - /// - /// # Arguments - /// - `entity_map`: shared map of entity ID to entity information - /// - `sleep_interval`: the interval in milliseconds between finding the access info of entities - /// - `provider_proxy_selector_request_sender`: sends requests to the provider proxy selector - async fn run( - &self, - entity_map: Arc>>>, - sleep_interval: Duration, - provider_proxy_selector_request_sender: Arc, - ) -> Result<(), DigitalTwinAdapterError> { - loop { - self.update_entity_map( - entity_map.clone(), - provider_proxy_selector_request_sender.clone(), - ) - .await?; - tokio::time::sleep(sleep_interval).await; - } - } } #[cfg(test)] mod in_memory_mock_digital_twin_adapter_tests { use super::*; - use freyja_contracts::provider_proxy::OperationKind; + use freyja_contracts::{entity::Entity, provider_proxy::OperationKind}; #[test] fn from_config_file_returns_err_on_nonexistent_file() { diff --git a/digital_twin_adapters/mock_digital_twin_adapter/src/mock_digital_twin_adapter.rs b/digital_twin_adapters/mock_digital_twin_adapter/src/mock_digital_twin_adapter.rs index c3e296c8..f5ea852f 100644 --- a/digital_twin_adapters/mock_digital_twin_adapter/src/mock_digital_twin_adapter.rs +++ b/digital_twin_adapters/mock_digital_twin_adapter/src/mock_digital_twin_adapter.rs @@ -2,9 +2,6 @@ // Licensed under the MIT license. // SPDX-License-Identifier: MIT -use std::collections::HashMap; -use std::sync::{Arc, Mutex}; -use std::time::Duration; use std::{fs, path::Path}; use async_trait::async_trait; @@ -16,8 +13,7 @@ use freyja_contracts::digital_twin_adapter::{ DigitalTwinAdapter, DigitalTwinAdapterError, GetDigitalTwinProviderRequest, GetDigitalTwinProviderResponse, }; -use freyja_contracts::entity::{Entity, EntityID}; -use freyja_contracts::provider_proxy_request::ProviderProxySelectorRequestSender; + use mock_digital_twin::ENTITY_QUERY_PATH; /// Mocks a Digital Twin Adapter that calls the mocks/mock_digital_twin @@ -59,15 +55,13 @@ impl MockDigitalTwinAdapter { #[async_trait] impl DigitalTwinAdapter for MockDigitalTwinAdapter { /// Creates a new instance of a MockDigitalTwinAdapter - fn create_new() -> Result, DigitalTwinAdapterError> { + fn create_new() -> Result { let settings_content = fs::read_to_string(Path::new(env!("OUT_DIR")).join(CONFIG_FILE)) .map_err(DigitalTwinAdapterError::io)?; let settings: Settings = serde_json::from_str(settings_content.as_str()) .map_err(DigitalTwinAdapterError::deserialize)?; - Ok(Box::new(Self::with_uri( - &settings.base_uri_for_digital_twin_server, - ))) + Ok(Self::with_uri(&settings.base_uri_for_digital_twin_server)) } /// Gets the info of an entity via an HTTP request. @@ -82,6 +76,7 @@ impl DigitalTwinAdapter for MockDigitalTwinAdapter { "{}{ENTITY_QUERY_PATH}{}", self.base_uri_for_digital_twin_server, request.entity_id ); + self.client .get(&target) .send() @@ -93,26 +88,4 @@ impl DigitalTwinAdapter for MockDigitalTwinAdapter { .await .map_err(DigitalTwinAdapterError::deserialize) } - - /// Run as a client to contact the mocks/mock_digital_twin for finding an entity's info - /// - /// # Arguments - /// - `entity_map`: shared map of entity ID to entity information - /// - `sleep_interval`: the interval in milliseconds between finding the access info of entities - /// - `provider_proxy_selector_request_sender`: sends requests to the provider proxy selector - async fn run( - &self, - entity_map: Arc>>>, - sleep_interval: Duration, - provider_proxy_selector_request_sender: Arc, - ) -> Result<(), DigitalTwinAdapterError> { - loop { - self.update_entity_map( - entity_map.clone(), - provider_proxy_selector_request_sender.clone(), - ) - .await?; - tokio::time::sleep(sleep_interval).await; - } - } } diff --git a/docs/design/README.md b/docs/design/README.md index 0b5ce8cf..c7805788 100644 --- a/docs/design/README.md +++ b/docs/design/README.md @@ -28,7 +28,7 @@ The cartographer is the core component responsible for managing the digital twin ### Emitter -The emitter is the core component responsible for actually emitting data. The emitter supports intervals at a per-signal level to enable signals to have different requirements on how often they are synced with the cloud. +The emitter is the core component responsible for actually emitting data. The emitter supports intervals at a per-signal level to enable signals to have different requirements on how often they are synced with the cloud. Note that once a signal is added to the mapping and picked up by the cartographer, it can take up to `min(`*`I`*`)` before the signal is emitted, where *`I`* is the set of intervals for signals already being tracked. ![Digital Twin Sequence Diagram](../diagrams/digital_twin_to_emitter_sequence.svg) diff --git a/freyja/Cargo.toml b/freyja/Cargo.toml index 7a47a33a..16b6e22d 100644 --- a/freyja/Cargo.toml +++ b/freyja/Cargo.toml @@ -22,4 +22,4 @@ time = { workspace = true } tokio = { workspace = true } [dev-dependencies] -in-memory-mock-cloud-adapter = { path = "../cloud_adapters/in_memory_mock_cloud_adapter"} \ No newline at end of file +mockall = { workspace = true } \ No newline at end of file diff --git a/freyja/src/cartographer.rs b/freyja/src/cartographer.rs index 809bc68c..904a4d25 100644 --- a/freyja/src/cartographer.rs +++ b/freyja/src/cartographer.rs @@ -2,52 +2,79 @@ // Licensed under the MIT license. // SPDX-License-Identifier: MIT -use std::sync::{Arc, Mutex}; -use std::{collections::HashMap, time::Duration}; +use std::sync::Arc; +use std::time::Duration; -use log::info; +use freyja_common::signal_store::SignalStore; +use log::{info, warn}; -use freyja_contracts::digital_twin_map_entry::DigitalTwinMapEntry; -use freyja_contracts::mapping_client::{CheckForWorkRequest, GetMappingRequest, MappingClient}; +use freyja_contracts::{ + conversion::Conversion, + digital_twin_adapter::{ + DigitalTwinAdapter, DigitalTwinAdapterError, DigitalTwinAdapterErrorKind, + GetDigitalTwinProviderRequest, + }, + mapping_client::{CheckForWorkRequest, GetMappingRequest, MappingClient}, + provider_proxy_request::{ + ProviderProxySelectorRequestKind, ProviderProxySelectorRequestSender, + }, + signal::{EmissionPolicy, SignalPatch, Target}, +}; /// Manages mappings from the mapping service -pub struct Cartographer { - /// The mapping shared with the emitter - map: Arc>>, +pub struct Cartographer { + /// The shared signal store + signals: Arc, /// The mapping client - mapping_client: Box, + mapping_client: TMappingClient, + + /// The digital twin client + digital_twin_client: TDigitalTwinAdapter, + + /// The provider proxy selector client + provider_proxy_selector_client: ProviderProxySelectorRequestSender, /// The mapping service polling interval poll_interval: Duration, } -impl Cartographer { +impl + Cartographer +{ /// Create a new instance of a Cartographer /// /// # Arguments - /// - `map`: the shared map instance to update with new changes + /// - `signals`: the shared signal store /// - `mapping_client`: the client for the mapping service + /// - `digital_twin_client`: the client for the digital twin service + /// - `provider_proxy_selector_client`: the client for the provider proxy selector /// - `poll_interval`: the interval at which the cartographer should poll for changes pub fn new( - map: Arc>>, - mapping_client: Box, + signals: Arc, + mapping_client: TMappingClient, + digital_twin_client: TDigitalTwinAdapter, + provider_proxy_selector_client: ProviderProxySelectorRequestSender, poll_interval: Duration, ) -> Self { Self { - map, + signals, mapping_client, + digital_twin_client, + provider_proxy_selector_client, poll_interval, } } /// Run the cartographer. This will do the following in a loop: /// - /// 1. Check to see if the mapping service has more work. If not, skip to step 5 - /// 2. Send the new inventory to the mapping service - /// 3. Get the new mapping from the mapping service - /// 4. Update the shared map for the emitter - /// 5. Sleep until the next iteration + /// 1. Check to see if the mapping service has more work. If not, skip to the last step + /// 1. ~~Send the new inventory to the mapping service~~ + /// 1. Get the new mapping from the mapping service + /// 1. Query the digital twin service for entity information + /// 1. Create or update provider proxies for the new entities + /// 1. Update the signal store with the new data + /// 1. Sleep until the next iteration pub async fn run(&self) -> Result<(), Box> { loop { let mapping_client_result = self @@ -55,25 +82,302 @@ impl Cartographer { .check_for_work(CheckForWorkRequest {}) .await; - let mapping_work = mapping_client_result?; + if mapping_client_result.is_err() { + let error = mapping_client_result.err().unwrap(); + log::error!( + "Failed to check for mapping work; will try again later. Error: {error}" + ); + continue; + } - if mapping_work.has_work { + if mapping_client_result.unwrap().has_work { info!("Cartographer detected mapping work"); // TODO: will this notion of checking and sending inventory exist? // self.mapping_client.send_inventory(SendInventoryRequest { inventory: self.known_providers.clone() }).await?; - // TODO: waiting/retry logic? - let mapping_response = self - .mapping_client - .get_mapping(GetMappingRequest {}) - .await?; + let patches_result = self.get_mapping_as_signal_patches().await; + if patches_result.is_err() { + let error = patches_result.err().unwrap(); + log::error!("Falied to get mapping from mapping client: {error}"); + continue; + } + + let mut patches = patches_result.unwrap(); + let mut failed_signals = Vec::new(); + + for patch in patches.iter_mut() { + // Many of the API calls in populate_entity are probably unnecessary, but this code gets executed + // infrequently enough that the sub-optimal performance is not a major concern. + // A bulk find_by_id API in the digital twin service would make this a non-issue + let populate_result = self.populate_source(patch).await; + + if populate_result.is_err() { + match populate_result + .err() + .unwrap() + .downcast::() + { + Ok(e) if e.kind() == DigitalTwinAdapterErrorKind::EntityNotFound => { + warn!("Entity not found for signal {}", patch.id); + } + Ok(e) => { + log::error!("Error fetching entity for signal {}: {e:?}", patch.id); + } + Err(e) => { + log::error!("Error fetching entity for signal {}: {e:?}", patch.id); + } + } + + failed_signals.push(patch.id.clone()); + } + } - // Note: since this a sync lock, do not introduce async calls to this block without switching to an async lock! - *self.map.lock().unwrap() = mapping_response.map; + self.signals.sync( + patches + .into_iter() + .filter(|s| !failed_signals.contains(&s.id)), + ); } tokio::time::sleep(self.poll_interval).await; } } + + /// Gets the mapping from the mapping client and returns a corresponding list of signal patches. + async fn get_mapping_as_signal_patches( + &self, + ) -> Result, Box> { + Ok(self + .mapping_client + .get_mapping(GetMappingRequest {}) + .await? + .map + .into_iter() + .map(|(id, entry)| SignalPatch { + id, + // this gets populated later, set to default for now + source: Default::default(), + target: Target { + metadata: entry.target, + }, + emission_policy: EmissionPolicy { + interval_ms: entry.interval_ms, + emit_only_if_changed: entry.emit_on_change, + conversion: Conversion::default(), + }, + }) + .collect()) + } + + /// Populates the source of the provided signal with data retrieved from the digital twin service. + /// This will also create or update a proxy to handle incoming requests from the provider. + /// + /// Arguments + /// - `signal`: The signal patch to update + async fn populate_source( + &self, + signal: &mut SignalPatch, + ) -> Result<(), Box> { + signal.source = self + .digital_twin_client + .find_by_id(GetDigitalTwinProviderRequest { + entity_id: signal.id.clone(), + }) + .await? + .entity; + + let request = ProviderProxySelectorRequestKind::CreateOrUpdateProviderProxy { + entity_id: signal.source.id.clone(), + uri: signal.source.uri.clone(), + protocol: signal.source.protocol.clone(), + operation: signal.source.operation.clone(), + }; + + self.provider_proxy_selector_client + .send_request_to_provider_proxy_selector(request) + .map_err(|e| format!("Error sending request to provider proxy selector: {e}"))?; + + Ok(()) + } +} + +#[cfg(test)] +mod cartographer_tests { + use std::collections::HashMap; + + use super::*; + + use async_trait::async_trait; + use mockall::*; + + use freyja_contracts::{ + digital_twin_adapter::{DigitalTwinAdapterError, GetDigitalTwinProviderResponse}, + digital_twin_map_entry::DigitalTwinMapEntry, + entity::Entity, + mapping_client::{ + CheckForWorkResponse, GetMappingResponse, MappingClientError, SendInventoryRequest, + SendInventoryResponse, + }, + provider_proxy::OperationKind, + }; + use tokio::sync::mpsc; + + mock! { + pub DigitalTwinAdapterImpl {} + + #[async_trait] + impl DigitalTwinAdapter for DigitalTwinAdapterImpl { + fn create_new() -> Result + where + Self: Sized; + + async fn find_by_id( + &self, + request: GetDigitalTwinProviderRequest, + ) -> Result; + } + } + + mock! { + pub MappingClientImpl {} + + #[async_trait] + impl MappingClient for MappingClientImpl { + fn create_new() -> Result + where + Self: Sized; + + async fn check_for_work( + &self, + request: CheckForWorkRequest, + ) -> Result; + + async fn send_inventory( + &self, + inventory: SendInventoryRequest, + ) -> Result; + + async fn get_mapping( + &self, + request: GetMappingRequest, + ) -> Result; + } + } + + #[tokio::test] + async fn get_mapping_as_signals_returns_correct_value() { + const ID: &str = "testid"; + let test_map_entry = DigitalTwinMapEntry { + source: ID.to_string(), + target: HashMap::new(), + interval_ms: 42, + conversion: Default::default(), + emit_on_change: true, + }; + + let test_map_entry_clone = test_map_entry.clone(); + + let mut mock_mapping_client = MockMappingClientImpl::new(); + mock_mapping_client + .expect_get_mapping() + .returning(move |_| { + Ok(GetMappingResponse { + map: [(ID.to_string(), test_map_entry_clone.clone())] + .into_iter() + .collect(), + }) + }); + + let (tx, _) = mpsc::unbounded_channel::(); + let provider_proxy_selector_client = ProviderProxySelectorRequestSender::new(tx); + let uut = Cartographer { + signals: Arc::new(SignalStore::new()), + mapping_client: mock_mapping_client, + digital_twin_client: MockDigitalTwinAdapterImpl::new(), + provider_proxy_selector_client, + poll_interval: Duration::from_secs(1), + }; + + let result = uut.get_mapping_as_signal_patches().await; + assert!(result.is_ok()); + let mut signals = result.unwrap(); + assert_eq!(signals.len(), 1); + let signal = signals.pop().unwrap(); + assert_eq!(signal.id, ID.to_string()); + assert_eq!(signal.target.metadata, test_map_entry.target); + assert_eq!( + signal.emission_policy.interval_ms, + test_map_entry.interval_ms + ); + assert_eq!( + signal.emission_policy.emit_only_if_changed, + test_map_entry.emit_on_change + ); + assert_eq!(signal.emission_policy.conversion, test_map_entry.conversion); + } + + #[tokio::test] + async fn populate_source_tests() { + const ID: &str = "testid"; + let test_entity = Entity { + id: ID.to_string(), + name: Some("name".to_string()), + uri: "uri".to_string(), + description: Some("description".to_string()), + operation: OperationKind::Get, + protocol: "protocol".to_string(), + }; + + let test_signal_patch = &mut SignalPatch { + id: ID.to_string(), + ..Default::default() + }; + + let test_entity_clone = test_entity.clone(); + + let mut mock_dt_adapter = MockDigitalTwinAdapterImpl::new(); + mock_dt_adapter.expect_find_by_id().returning(move |_| { + Ok(GetDigitalTwinProviderResponse { + entity: test_entity_clone.clone(), + }) + }); + + let (tx, mut rx) = mpsc::unbounded_channel::(); + let provider_proxy_selector_client = ProviderProxySelectorRequestSender::new(tx); + let listener_handler = tokio::spawn(async move { rx.recv().await }); + + let uut = Cartographer { + signals: Arc::new(SignalStore::new()), + mapping_client: MockMappingClientImpl::new(), + digital_twin_client: mock_dt_adapter, + provider_proxy_selector_client, + poll_interval: Duration::from_secs(1), + }; + + let result = uut.populate_source(test_signal_patch).await; + let join_result = listener_handler.await; + + assert!(result.is_ok()); + assert!(join_result.is_ok()); + assert_eq!(test_signal_patch.source, test_entity); + + let proxy_request = join_result.unwrap(); + assert!(proxy_request.is_some()); + let proxy_request = proxy_request.as_ref().unwrap(); + match proxy_request { + ProviderProxySelectorRequestKind::CreateOrUpdateProviderProxy { + entity_id, + uri, + protocol, + operation, + } => { + assert_eq!(*entity_id, test_entity.id); + assert_eq!(*uri, test_entity.uri); + assert_eq!(*protocol, test_entity.protocol); + assert_eq!(*operation, test_entity.operation); + } + _ => panic!("Unexpected proxy request kind: {proxy_request:?}"), + } + } } diff --git a/freyja/src/emitter.rs b/freyja/src/emitter.rs index 88639df2..93a0ed71 100644 --- a/freyja/src/emitter.rs +++ b/freyja/src/emitter.rs @@ -2,1101 +2,617 @@ // Licensed under the MIT license. // SPDX-License-Identifier: MIT -use std::{ - collections::{hash_map::Entry::Occupied, hash_map::Entry::Vacant, HashMap}, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{cmp::min, sync::Arc, time::Duration}; use crossbeam::queue::SegQueue; -use log::{debug, info}; +use log::{info, warn}; use time::OffsetDateTime; use tokio::time::sleep; +use freyja_common::signal_store::SignalStore; use freyja_contracts::{ - cloud_adapter::{CloudAdapter, CloudAdapterError, CloudMessageRequest, CloudMessageResponse}, - digital_twin_map_entry::DigitalTwinMapEntry, - entity::{Entity, EntityID}, + cloud_adapter::{CloudAdapter, CloudMessageRequest, CloudMessageResponse}, provider_proxy::SignalValue, provider_proxy_request::{ ProviderProxySelectorRequestKind, ProviderProxySelectorRequestSender, }, + signal::Signal, }; -type TimeMsLeft = u64; - -/// Data emitter for the digital twin sync project -/// Emits sensor data at regular intervals as defined by the map -pub struct Emitter { - /// The mapping is shared with the cartographer - dt_map_entries: Arc>>, +const DEFAULT_SLEEP_INTERVAL_MS: u64 = 1000; - /// Shared map of entity ID to entity info - entity_map: Arc>>>, +/// Emits sensor data at regular intervals as configured in the store +pub struct Emitter { + /// The shared signal store + signals: Arc, /// The cloud adapter used to emit data to the cloud - cloud_adapter: Box, + cloud_adapter: TCloudAdapter, /// Sends requests to the provider proxy selector - provider_proxy_selector_request_sender: Arc, + provider_proxy_selector_client: ProviderProxySelectorRequestSender, /// Shared message queue for obtaining new signal values signal_values_queue: Arc>, } -/// The payload for an emission -#[derive(Debug, Clone)] -pub(crate) struct EmissionPayload { - /// The time left until emission - time_ms_left: u64, - - // States whether this signal has been emitted - has_signal_been_emitted: bool, - - // Stores state of previous signal value emitted - previous_signal_value: Option, - - // States whether this provider value has changed - did_signal_value_change: bool, -} - -impl Emitter { - /// Creates a new instance of emitter +impl Emitter { + /// Creates a new instance of the Emitter /// /// # Arguments - /// - `dt_map_entries`: shared hashmap with Cartographer for storing provider mapping info + /// - `signals`: the shared signal store /// - `cloud_adapter`: the cloud adapter used to emit to the cloud - /// - `entity_map`: shared map of entity ID to entity information /// - `provider_proxy_selector_request_sender`: sends requests to the provider proxy selector /// - `signal_values_queue`: queue for receiving signal values pub fn new( - dt_map_entries: Arc>>, - cloud_adapter: Box, - entity_map: Arc>>>, - provider_proxy_selector_request_sender: Arc, + signals: Arc, + cloud_adapter: TCloudAdapter, + provider_proxy_selector_client: ProviderProxySelectorRequestSender, signal_values_queue: Arc>, ) -> Self { Self { - dt_map_entries, + signals, cloud_adapter, - entity_map, - provider_proxy_selector_request_sender, + provider_proxy_selector_client, signal_values_queue, } } - /// Updates the emissions hashmap when there are changes to the dt_map_entries hashmap - /// - /// # Arguments - /// - `emissions_map`: hashmap for storing signals to emit - /// - `dt_map_entries`: cloned hashmap for storing mapping information - fn update_emissions( - emissions_map: &mut HashMap, - dt_map_entries: &HashMap, - ) { - for (signal_id, entry) in dt_map_entries.iter() { - // Insert into emissions if unique mapping - emissions_map - .entry(signal_id.clone()) - .or_insert_with(|| EmissionPayload { - time_ms_left: entry.interval_ms, - has_signal_been_emitted: false, - previous_signal_value: None, - did_signal_value_change: false, - }); - } - - // If we have a mapping that doesn't exist anymore in map, - // but still exists in emissions, we remove it - emissions_map.retain(|signal_id, _| dt_map_entries.contains_key(signal_id)); - } - - /// Creates a key to the shared entity_map when there are new emissions - /// - /// # Arguments - /// - `emissions_map`: hashmap for storing signals to emit - async fn create_key_to_entity_map( - &self, - emissions_map: &HashMap, - ) -> Result<(), String> { - // Check to see if an emission entry requires a key - let mut entity_map = self.entity_map.lock().unwrap(); - - // Create a key if a key-value doesn't exist for entity_map - for (signal_id, _) in emissions_map.iter() { - entity_map.entry(signal_id.clone()).or_insert_with(|| None); - } - - // Entity Map may be outdated due to change in emissions, - // so only retain the entities for signals that need to be emitted - entity_map.retain(|signal_id, _| emissions_map.contains_key(signal_id)); - - Ok(()) - } - /// Execute this Emitter pub async fn run(&self) -> Result<(), Box> { - let mut min_interval_ms = None; - let mut emissions_map: HashMap = HashMap::new(); - let mut signal_values_map = HashMap::new(); - + let mut sleep_interval = u64::MAX; loop { - let mut dt_map_entries_clone: HashMap; - // Note: since this a sync lock, do not introduce async calls to this block without switching to an async lock! - { - let dt_map_entries = self.dt_map_entries.lock().unwrap(); - dt_map_entries_clone = dt_map_entries.clone(); - } - - Self::update_emissions(&mut emissions_map, &dt_map_entries_clone); + self.update_signal_values(); - self.create_key_to_entity_map(&emissions_map).await?; + // Update the emission times and get the list of all signals. + // This is performed as a single operation to minimize the impact of changes to the signal set during processing. + // Note that the first time the loop is executed sleep_interval will still be u64::MAX, + // which will have the effect of force-emitting every signal in the store (though typically there won't be anything). + // After that, the intervals will be no more than the max configured interval. + let signals = self + .signals + .update_emission_times_and_get_all(sleep_interval); - self.emit_signals( - &mut emissions_map, - &mut signal_values_map, - &mut dt_map_entries_clone, - &mut min_interval_ms, - ) - .await?; + sleep_interval = self.emit_data(signals).await?; - let sleep_interval = min_interval_ms.unwrap_or(1000); info!("Checking for next emission in {sleep_interval}ms\n"); sleep(Duration::from_millis(sleep_interval)).await; } } - /// Updates the signal values map - /// - /// # Arguments - /// - `signal_values_map`: a map for storing a signal value - fn update_signal_values_map(&self, signal_values_map: &mut HashMap>) { + /// Updates the signal values map. + /// This will eventually get removed and provider proxies will update the store directly, + /// but it remains temporarily to scope work down a bit. + fn update_signal_values(&self) { while !self.signal_values_queue.is_empty() { let SignalValue { entity_id, value } = self.signal_values_queue.pop().unwrap(); - signal_values_map.insert(entity_id, Some(value)); - } - } - - /// Applies conversion implicitly to a signal_value and sends it to the cloud - /// - /// # Arguments - /// - `dt_map_entry`: the digital twin map entry to send to cloud - /// - `signal_value`: the signal value - async fn send_to_cloud( - &self, - dt_map_entry: &DigitalTwinMapEntry, - signal_value: String, - ) -> Result { - let mut converted = signal_value.clone(); - if let Ok(value) = signal_value.parse::() { - converted = dt_map_entry.conversion.apply(value).to_string(); - } - - info!( - "Digital Twin Instance {:?}: {}", - dt_map_entry.target, converted - ); - info!("\t(from {}: {})", dt_map_entry.source, signal_value); - - let cloud_message = CloudMessageRequest { - cloud_signal: dt_map_entry.target.clone(), - signal_value: converted, - signal_timestamp: OffsetDateTime::now_utc().to_string(), - }; - let response = self.cloud_adapter.send_to_cloud(cloud_message).await?; - Ok(response) - } - - /// Gets a signal value from a signal_values hashmap - /// - /// # Arguments - /// - `signal_values_map`: hashmap for storing signal values - /// - `signal_id`: the signal id - fn get_signal_value_with_entity_id( - signal_values_map: &mut HashMap>, - signal_id: &str, - ) -> Option { - match signal_values_map.entry(signal_id.to_string()) { - Occupied(value) => value.get().clone(), - Vacant(_) => None, - } - } - - /// Gets a digital twin map entry from a dt_map_entries hashmap - /// - /// # Arguments - /// - `dt_map_entries`: hashmap for storing mapping information - fn get_dt_map_entry( - dt_map_entries: &mut HashMap, - signal_id: &str, - ) -> Option { - match dt_map_entries.entry(signal_id.to_string()) { - Occupied(dt_map_entry) => Some(dt_map_entry.get().clone()), - Vacant(_) => None, - } - } - - /// Updates did_signal_value_change for each emission in emissions_map - /// - /// # Arguments - /// - `emissions_map`: hashmap for storing signals to emit - /// - `signal_values_map`: hashmap for storing signal values - fn update_emissions_signal_value_change_status( - emissions_map: &mut HashMap, - signal_values_map: &mut HashMap>, - ) { - for (signal_id, emission_payload) in emissions_map.iter_mut() { - let previous_signal_value = emission_payload.previous_signal_value.clone(); - let current_signal_value = - Self::get_signal_value_with_entity_id(signal_values_map, signal_id); - - // Update this emission payload inside emissions if - // previous value is different - if previous_signal_value != current_signal_value { - debug!("{signal_id}, previous signal value {previous_signal_value:?}, current signal_value {current_signal_value:?}"); - emission_payload.previous_signal_value = current_signal_value; - emission_payload.did_signal_value_change = true; - } else { - emission_payload.did_signal_value_change = false; + if self.signals.set_value(entity_id.clone(), value).is_none() { + warn!("Attempted to update signal {entity_id} but it wasn't found") } } } - /// Emit signals based on the dt_map_entries mapping and the emissions + /// Performs data emissions of the provided signals. + /// Returns the amount of time that the main emitter loop should sleep before the next iteration. /// /// # Arguments - /// - `emissions_map`: hashmap for storing signals to emit - /// - `signal_values_map`: hashmap for storing signal values - /// - `dt_map_entries`: hashmap for storing mapping information - /// - `min_interval_ms`: minimum interval in milliseconds - async fn emit_signals( - &self, - emissions_map: &mut HashMap, - signal_values_map: &mut HashMap>, - dt_map_entries: &mut HashMap, - min_interval_ms: &mut Option, - ) -> Result, Box> { - // Signals that match the minimum interval - let emissions_with_min_interval = - Self::get_emissions_with_min_interval(emissions_map, min_interval_ms); - - if !emissions_map.is_empty() { + /// - `signals`: The set of signals to emit + async fn emit_data(&self, signals: Vec) -> Result { + if signals.is_empty() { + Ok(DEFAULT_SLEEP_INTERVAL_MS) + } else { info!("********************BEGIN EMISSION********************"); - } - - // Update our signal values cache - // After getting signal values, check if a value has changed in - // emissions and update the did_signal_value_change field for each emission - self.update_signal_values_map(signal_values_map); - Self::update_emissions_signal_value_change_status(emissions_map, signal_values_map); - - // Emit the signal values that have min_interval_ms - for signal_id in emissions_with_min_interval.keys() { - let signal_value = - match Self::get_signal_value_with_entity_id(signal_values_map, signal_id) { - Some(signal_value) => signal_value, - None => { - info!( - "No signal value for {} in our cache. Skipping emission for this signal.", - signal_id - ); - - // Send request again for a new signal value since our cache - // doesn't contain a signal value for this signal - let request = ProviderProxySelectorRequestKind::GetEntityValue( - String::from(signal_id), - ); + let mut sleep_interval = u64::MAX; + + for signal in signals { + if signal.emission.next_emission_ms > 0 { + // Don't emit this signal on this iteration, but use the value to update the sleep interval + sleep_interval = min(sleep_interval, signal.emission.next_emission_ms); + + // Go to next signal + continue; + } else { + // We will emit this signal since the timer is expired, + // but need to also check the new interval in case it's smaller than the remaining intervals + sleep_interval = min(sleep_interval, signal.emission.policy.interval_ms); + } - self.provider_proxy_selector_request_sender - .send_request_to_provider_proxy_selector(request); - continue; - } + // Submit a request for a new value for the next iteration. + // This approach to requesting signal values introduces an inherent delay in uploading data + // of signal.emission.policy.interval_ms and needs to be revisited. + let request = ProviderProxySelectorRequestKind::GetEntityValue { + entity_id: signal.id.clone(), }; - // Acquire the signal entry for the signal id. - // This is guaranteed to exist since emissions_with_min_interval is a subset of dt_map_entries. - let dt_map_entry = Self::get_dt_map_entry(dt_map_entries, signal_id).unwrap(); - - // Get a new value for this current entity in emission - // if an entity only supports subscribe this call shouldn't do anything - let request = ProviderProxySelectorRequestKind::GetEntityValue(String::from(signal_id)); - self.provider_proxy_selector_request_sender - .send_request_to_provider_proxy_selector(request); - - // Checks if this digital twin map entry requires emit on change - // then handles the digital twin map entry for emitting on change - // if the signal has been emitted and its value did not change then skip emission - if Self::should_emit_signal_for_emit_on_change(&dt_map_entry, emissions_map) { - Self::set_emission_has_signal_been_emitted( - emissions_map, - &dt_map_entry.source, - false, - )?; - } else { - info!("Signal {} did not change and has already been emitted. Skipping emission for this signal.", dt_map_entry.source); - continue; - } + let proxy_result = self + .provider_proxy_selector_client + .send_request_to_provider_proxy_selector(request) + .map_err(EmitterError::provider_proxy_error); - self.send_to_cloud(&dt_map_entry, signal_value).await?; - Self::set_emission_has_signal_been_emitted(emissions_map, signal_id, true)? - } + if proxy_result.is_err() { + log::error!("Error submitting request for signal value while processing signal {}: {:?}", signal.id, proxy_result.err()); + } - if !emissions_map.is_empty() { - info!("*********************END EMISSION*********************"); - } + if signal.value.is_none() { + info!( + "No signal value for {} in our cache. Skipping emission for this signal.", + signal.id + ); - // Emitted the previous signals, so now we update the time left based on new signals in emissions - Self::update_emission_payload_time_left(dt_map_entries, emissions_map, min_interval_ms); - debug!("Signals in the emissions after updating {emissions_map:?}"); + // Go to the next signal + continue; + } - Ok(emissions_with_min_interval) - } + if signal.emission.policy.emit_only_if_changed + && signal.emission.last_emitted_value.is_some() + && signal.value == signal.emission.last_emitted_value + { + info!("Signal {} did not change and has already been emitted. Skipping emission for this signal.", signal.id); - /// Updates the time left in an emission payload - /// and sets a new min_interval_ms - /// - /// # Arguments - /// - `dt_map_entries`: hashmap for storing signals - /// - `emissions_map`: hashmap for storing signals to emit - /// - `min_interval_ms`: minimum interval in milliseconds - fn update_emission_payload_time_left( - dt_map_entries: &mut HashMap, - emissions_map: &mut HashMap, - min_interval_ms: &mut Option, - ) { - let mut new_min_interval: Option = None; - - // Update the intervals based on time elapsed - for (signal_id, emission_payload) in emissions_map.iter_mut() { - if let Some(interval) = min_interval_ms { - if emission_payload.time_ms_left >= *interval { - emission_payload.time_ms_left -= *interval; + // Go to next signal + continue; } - // Reset the time_ms_left - if emission_payload.time_ms_left == 0 { - let Some(dt_map_entry) = Self::get_dt_map_entry(dt_map_entries, signal_id) - else { - continue; - }; - emission_payload.time_ms_left = dt_map_entry.interval_ms; + let signal_id = signal.id.clone(); + let send_to_cloud_result = self.send_to_cloud(signal).await; + + if send_to_cloud_result.is_err() { + log::error!( + "Error sending data to cloud while processing signal {}: {:?}", + signal_id, + send_to_cloud_result.err() + ); } } - new_min_interval = match new_min_interval { - Some(interval) => Some(TimeMsLeft::min(interval, emission_payload.time_ms_left)), - None => Some(emission_payload.time_ms_left), - } - } + info!("*********************END EMISSION*********************"); - *min_interval_ms = new_min_interval; + Ok(sleep_interval) + } } - /// Gets the emissions with all entries that have the minimum sleep interval + /// Applies a conversion implicitly to a signal value and sends it to the cloud /// /// # Arguments - /// - `emissions_map`: hashmap for storing signals to emit - /// - `min_interval_ms`: minimum interval in milliseconds - fn get_emissions_with_min_interval( - emissions_map: &HashMap, - min_interval_ms: &Option, - ) -> HashMap { - emissions_map + /// - `signal`: The signal to emit + async fn send_to_cloud(&self, signal: Signal) -> Result { + let value = signal + .value .clone() - .into_iter() - .filter(|(_, emission_payload)| match min_interval_ms { - Some(interval) => emission_payload.time_ms_left == *interval, - None => true, - }) - .map(|(signal_id, emission_payload)| (signal_id, emission_payload.time_ms_left)) - .collect() - } + // This error case should actually be unreachable, but always good to check! + .ok_or::(EmitterErrorKind::SignalValueEmpty.into())?; - /// Determines if a signal should emit if it requries to be emit on change - /// - /// # Arguments - /// - `dt_map_entry`: digital twin map entry - /// - `emissions_map`: hashmap for storing signals - fn should_emit_signal_for_emit_on_change( - dt_map_entry: &DigitalTwinMapEntry, - emissions_map: &mut HashMap, - ) -> bool { - // Skip if the digital twin map entry doesn't require emit on change - if !dt_map_entry.emit_on_change { - return true; - } + let converted = value.parse::().map_or(value.clone(), |v| { + signal.emission.policy.conversion.apply(v).to_string() + }); - match emissions_map.entry(dt_map_entry.source.clone()) { - // If this digital_twin map entry requires to only emit on change, - // then if it's signal value did not change and signal has already been emitted - // we skip this signal's emission - Occupied(emission_payload) => { - if !emission_payload.get().did_signal_value_change - && emission_payload.get().has_signal_been_emitted - { - return false; - } - true - } - Vacant(_) => false, - } - } + info!( + "Digital Twin Instance {:?}: {}", + signal.target.metadata, converted + ); - /// Sets an entry value in emission to has_signal_been_emitted - /// - /// # Arguments - /// - `emissions_map`: hashmap for storing signals - /// - `signal_id`: the signal id to set the value to has_signal_been_emitted - /// - `has_signal_been_emitted`: states whether the signal has been emitted - fn set_emission_has_signal_been_emitted( - emissions_map: &mut HashMap, - signal_id: &str, - has_signal_been_emitted: bool, - ) -> Result<(), EmitterError> { - match emissions_map.entry(String::from(signal_id)) { - Occupied(emission_payload) => { - emission_payload.into_mut().has_signal_been_emitted = has_signal_been_emitted; - Ok(()) - } - Vacant(_) => { - let error_message = format!("Cannot set has_signal_been_emitted = {has_signal_been_emitted} for {signal_id}"); - Err(EmitterError::cannot_set_signal_to_emitted(error_message)) - } - } + info!("\t(from {}: {:?})", signal.source.id, signal.value); + + let cloud_message = CloudMessageRequest { + cloud_signal: signal.target.metadata.clone(), + signal_value: converted, + signal_timestamp: OffsetDateTime::now_utc().to_string(), + }; + + let response = self + .cloud_adapter + .send_to_cloud(cloud_message) + .await + .map_err(EmitterError::cloud_error)?; + + // We don't set the last emitted value to the converted value so that we can meaningfully compare + // this value with the value coming directly from the signal. + self.signals.set_last_emitted_value(signal.id, value); + + Ok(response) } } proc_macros::error! { EmitterError { - CannotSetSignalToEmitted + SignalValueEmpty, + ProviderProxyError, + CloudError, } } #[cfg(test)] mod emitter_tests { use super::*; + use async_trait::async_trait; + use freyja_contracts::{ + cloud_adapter::{CloudAdapterError, CloudAdapterErrorKind}, + signal::{Emission, EmissionPolicy}, + }; + use mockall::*; + use tokio::sync::mpsc; - use std::collections::HashSet; + mock! { + pub CloudAdapterImpl {} - use core::panic; - use tokio::sync::mpsc; + #[async_trait] + impl CloudAdapter for CloudAdapterImpl { + fn create_new() -> Result + where + Self: Sized; - use freyja_contracts::conversion::Conversion; - use in_memory_mock_cloud_adapter::in_memory_mock_cloud_adapter::InMemoryMockCloudAdapter; - - mod fixture { - use super::*; - /// Fixture for struct Emitter - /// - /// # Arguments - /// - /// - `digital_twin_map_entries`: vector of digital twin map entries, where each entry contains mapping info - /// - `emissions_map`: Hashmap where the key is a map entry to emit, and the value is the time left in milliseconds to emit - /// - `min_interval_ms`: minimum interval in milliseconds from a list of signal intervals - pub(crate) struct EmitterFixture { - pub digital_twin_map_entries: HashMap, - pub emissions_map: HashMap, - pub min_interval_ms: Option, + async fn send_to_cloud( + &self, + cloud_message: CloudMessageRequest, + ) -> Result; } + } - impl EmitterFixture { - /// Setting up the Emitter Test Fixture - /// - /// # Arguments - /// - /// - `intervals_ms`: vector of requested signal intervals in milliseconds to emit - pub fn setup(intervals_ms: Vec) -> Self { - let digital_twin_map_entries = generate_digital_twin_map_entries(&intervals_ms); - let emissions_map = insert_digital_map_entries(&digital_twin_map_entries); - let min_interval_ms = None; - let mut operations = HashSet::new(); - operations.insert(String::from("Subscribe")); - - EmitterFixture { - digital_twin_map_entries, - emissions_map, - min_interval_ms, - } - } + #[tokio::test] + async fn emit_data_returns_default_on_empty_input() { + let (tx, _) = mpsc::unbounded_channel::(); + let provider_proxy_selector_client = ProviderProxySelectorRequestSender::new(tx); + let uut = Emitter { + signals: Arc::new(SignalStore::new()), + cloud_adapter: MockCloudAdapterImpl::new(), + provider_proxy_selector_client, + signal_values_queue: Arc::new(SegQueue::new()), + }; - pub fn set_digital_twin_map_entry_to_emit_on_change( - dt_map_entry: &mut DigitalTwinMapEntry, - ) { - dt_map_entry.emit_on_change = true; - } + let result = uut.emit_data(vec![]).await; - pub fn check_signal_time_left(&self, signal_id: &str, time_left_ms: u64) -> bool { - self.emissions_map - .get(&String::from(signal_id)) - .unwrap() - .time_ms_left - == time_left_ms - } - } + assert!(result.is_ok()); + assert_eq!(result.unwrap(), DEFAULT_SLEEP_INTERVAL_MS); } - fn generate_digital_twin_map_entries( - intervals_ms: &[u64], - ) -> HashMap { - let mut digital_twin_map_entries: HashMap = HashMap::new(); - - for (index, interval_ms) in intervals_ms.iter().enumerate() { - let (source, target_id) = (format!("test_{index}"), format!("test_target_{index}")); - - let mut target = HashMap::new(); - target.insert(target_id, String::new()); - digital_twin_map_entries.insert( - source.clone(), - DigitalTwinMapEntry { - source, - target, - interval_ms: *interval_ms, - conversion: Conversion::None, - emit_on_change: false, - }, - ); - } + #[tokio::test] + async fn emit_data_handles_nonzero_next_emission_time() { + const NEXT_EMISSION_MS: u64 = 42; - digital_twin_map_entries - } + let (tx, mut rx) = mpsc::unbounded_channel::(); + let provider_proxy_selector_client = ProviderProxySelectorRequestSender::new(tx); + let listener_handler = tokio::spawn(async move { rx.recv().await }); - fn insert_digital_map_entries( - digital_map_entries: &HashMap, - ) -> HashMap { - let mut emissions_map: HashMap = HashMap::new(); - - for entry in digital_map_entries.values() { - let emission_payload = EmissionPayload { - time_ms_left: entry.interval_ms, - has_signal_been_emitted: false, - previous_signal_value: None, - did_signal_value_change: false, - }; - emissions_map.insert(entry.source.clone(), emission_payload); - } + let mut mock_cloud_adapter = MockCloudAdapterImpl::new(); + mock_cloud_adapter.expect_send_to_cloud().never(); - emissions_map - } + let mut uut = Emitter { + signals: Arc::new(SignalStore::new()), + cloud_adapter: mock_cloud_adapter, + provider_proxy_selector_client, + signal_values_queue: Arc::new(SegQueue::new()), + }; - #[test] - fn update_emission_payload_time_left_test() { - let intervals_ms = vec![2000, 3000]; - let mut emitter_fixture = fixture::EmitterFixture::setup(intervals_ms.clone()); - let mut signal_ids = emitter_fixture - .emissions_map - .keys() - .cloned() - .collect::>(); - // Sort since collecting keys as a vector is random - signal_ids.sort(); - - let [test_1_entry, test_2_entry] = &signal_ids[..] else { - panic! {"Cannot get digital twin entries"} + let test_signal = Signal { + emission: Emission { + next_emission_ms: NEXT_EMISSION_MS, + ..Default::default() + }, + ..Default::default() }; - Emitter::update_emission_payload_time_left( - &mut emitter_fixture.digital_twin_map_entries, - &mut emitter_fixture.emissions_map, - &mut intervals_ms.into_iter().min(), - ); + let result = uut.emit_data(vec![test_signal]).await; - println!("{test_1_entry}"); - assert_eq!( - emitter_fixture - .emissions_map - .get(test_1_entry) - .unwrap() - .time_ms_left, - 2000 - ); - assert_eq!( - emitter_fixture - .emissions_map - .get(test_2_entry) - .unwrap() - .time_ms_left, - 1000 - ); + uut.cloud_adapter.checkpoint(); - Emitter::update_emission_payload_time_left( - &mut emitter_fixture.digital_twin_map_entries, - &mut emitter_fixture.emissions_map, - &mut None, - ); - assert_eq!( - emitter_fixture - .emissions_map - .get(test_1_entry) - .unwrap() - .time_ms_left, - 2000 - ); - assert_eq!( - emitter_fixture - .emissions_map - .get(test_2_entry) - .unwrap() - .time_ms_left, - 1000 - ); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), NEXT_EMISSION_MS); + + // The behavior of listener_handler here is unclear, it either + // will still be running because it's never received anything + // or will be completed with result None because the sender was dropped. + // I've seen both things happen before, so check just in case to avoid indefinite waiting + if listener_handler.is_finished() { + let listener_result = listener_handler.await; + assert!(listener_result.is_ok()); + let listener_result = listener_result.unwrap(); + assert!(listener_result.is_none()); + } } - #[test] - fn update_emissions_did_signal_value_change_test() { - let intervals_ms = vec![2000, 3000]; - let mut emitter_fixture = fixture::EmitterFixture::setup(intervals_ms); - let mut signal_values_map: HashMap> = emitter_fixture - .emissions_map - .keys() - .map(|signal_id| (signal_id.clone(), Some(String::from("0.00")))) - .collect(); - - Emitter::update_emissions_signal_value_change_status( - &mut emitter_fixture.emissions_map, - &mut signal_values_map, - ); + #[tokio::test] + async fn emit_data_handles_zero_next_emission_time() { + const INTERVAL: u64 = 42; + + let (tx, mut rx) = mpsc::unbounded_channel::(); + let provider_proxy_selector_client = ProviderProxySelectorRequestSender::new(tx); + let listener_handler = tokio::spawn(async move { rx.recv().await }); + + let mut mock_cloud_adapter = MockCloudAdapterImpl::new(); + mock_cloud_adapter + .expect_send_to_cloud() + .once() + .returning(|_| Ok(CloudMessageResponse {})); + + let mut uut = Emitter { + signals: Arc::new(SignalStore::new()), + cloud_adapter: mock_cloud_adapter, + provider_proxy_selector_client, + signal_values_queue: Arc::new(SegQueue::new()), + }; - for emission_payload in emitter_fixture.emissions_map.values() { - assert!(emission_payload.did_signal_value_change); - } + let test_signal = Signal { + value: Some("foo".to_string()), + emission: Emission { + next_emission_ms: 0, + policy: EmissionPolicy { + interval_ms: INTERVAL, + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }; - Emitter::update_emissions_signal_value_change_status( - &mut emitter_fixture.emissions_map, - &mut signal_values_map, - ); - for emission_payload in emitter_fixture.emissions_map.values() { - assert!(!emission_payload.did_signal_value_change); - } - } + let result = uut.emit_data(vec![test_signal]).await; + let listener_result = listener_handler.await; - #[test] - fn check_emit_for_emit_on_change_test() { - let intervals_ms = vec![2000, 3000]; - let mut emitter_fixture = fixture::EmitterFixture::setup(intervals_ms); - - for (signal_id, dt_map_entry) in emitter_fixture.digital_twin_map_entries.iter_mut() { - assert!( - !emitter_fixture - .emissions_map - .get(signal_id) - .unwrap() - .has_signal_been_emitted - ); - assert!( - !emitter_fixture - .emissions_map - .get(signal_id) - .unwrap() - .did_signal_value_change - ); - assert!(!dt_map_entry.emit_on_change); - let mut result = Emitter::should_emit_signal_for_emit_on_change( - dt_map_entry, - &mut emitter_fixture.emissions_map, - ); - assert!(result); - - dt_map_entry.emit_on_change = true; - emitter_fixture - .emissions_map - .get_mut(signal_id) - .unwrap() - .did_signal_value_change = true; - assert!( - !emitter_fixture - .emissions_map - .get(signal_id) - .unwrap() - .has_signal_been_emitted - ); - result = Emitter::should_emit_signal_for_emit_on_change( - dt_map_entry, - &mut emitter_fixture.emissions_map, - ); - assert!(result); - - emitter_fixture - .emissions_map - .get_mut(signal_id) - .unwrap() - .did_signal_value_change = false; - emitter_fixture - .emissions_map - .get_mut(signal_id) - .unwrap() - .has_signal_been_emitted = true; - result = Emitter::should_emit_signal_for_emit_on_change( - dt_map_entry, - &mut emitter_fixture.emissions_map, - ); - assert!(!result); - } + uut.cloud_adapter.checkpoint(); + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), INTERVAL); + assert!(listener_result.is_ok()); + let listener_result = listener_result.unwrap(); + assert!(listener_result.is_some()); } #[tokio::test] - async fn emit_two_signals_test() { - let intervals_ms = vec![3, 2]; - let mut time_index = 0; - let mut emitter_fixture = fixture::EmitterFixture::setup(intervals_ms); + async fn emit_data_doesnt_emit_when_value_empty() { + const INTERVAL: u64 = 42; - // The setup below is for instantiating an emitter - let map: Arc>> = - Arc::new(Mutex::new(HashMap::new())); + let (tx, mut rx) = mpsc::unbounded_channel::(); + let provider_proxy_selector_client = ProviderProxySelectorRequestSender::new(tx); + let listener_handler = tokio::spawn(async move { rx.recv().await }); - let cloud_adapter: Box = - InMemoryMockCloudAdapter::create_new().unwrap(); + let mut mock_cloud_adapter = MockCloudAdapterImpl::new(); + mock_cloud_adapter.expect_send_to_cloud().never(); - let entity_map: Arc>>> = - Arc::new(Mutex::new(HashMap::new())); + let mut uut = Emitter { + signals: Arc::new(SignalStore::new()), + cloud_adapter: mock_cloud_adapter, + provider_proxy_selector_client, + signal_values_queue: Arc::new(SegQueue::new()), + }; - let (tx_provider_proxy_selector_request, _rx_provider_proxy_selector_request) = - mpsc::unbounded_channel::(); - let provider_proxy_selector_request_sender = Arc::new( - ProviderProxySelectorRequestSender::new(tx_provider_proxy_selector_request), - ); - let signal_values_queue: Arc> = Arc::new(SegQueue::new()); + let test_signal = Signal { + value: None, + emission: Emission { + next_emission_ms: 0, + policy: EmissionPolicy { + interval_ms: INTERVAL, + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }; - let emitter = Emitter::new( - map, - cloud_adapter, - entity_map, - provider_proxy_selector_request_sender, - signal_values_queue, - ); - for _ in 0..10 { - let mut signal_values_map = HashMap::new(); - let emissions_with_min_interval = emitter - .emit_signals( - &mut emitter_fixture.emissions_map, - &mut signal_values_map, - &mut emitter_fixture.digital_twin_map_entries, - &mut emitter_fixture.min_interval_ms, - ) - .await - .unwrap(); - let time_sleep_duration = &emitter_fixture.min_interval_ms; - - // Signal emitting scenarios - match time_index { - // Initially we send all the signals and sleep for 2ms - 0 => { - assert_eq!(emissions_with_min_interval.len(), 2); - assert_eq!(time_sleep_duration.unwrap(), 2) - } - // Check if we're only emitting only 1 signal and that we're only sleeping for 1 ms - 2 | 3 | 8 | 9 => { - assert_eq!(emissions_with_min_interval.len(), 1); - assert_eq!(time_sleep_duration.unwrap(), 1); - } - // Check if we're emitting 1 signal and that we're only sleeping for 2ms - 4 | 10 => { - assert_eq!(emissions_with_min_interval.len(), 1); - assert_eq!(time_sleep_duration.unwrap(), 2); - } - // Check if we're emitting 2 signals and that we're only sleeping for 2ms - 6 => { - assert_eq!(emissions_with_min_interval.len(), 2); - assert_eq!(time_sleep_duration.unwrap(), 2); - } - _ => {} - } - // Simulate sleep - time_index += time_sleep_duration.unwrap(); - } + let result = uut.emit_data(vec![test_signal]).await; + let listener_result = listener_handler.await; + + uut.cloud_adapter.checkpoint(); + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), INTERVAL); + assert!(listener_result.is_ok()); + let listener_result = listener_result.unwrap(); + assert!(listener_result.is_some()); } #[tokio::test] - async fn emit_multiple_signals_test() { - let intervals_ms = vec![4, 7, 9]; - let mut time_index = 0; - let mut emitter_fixture = fixture::EmitterFixture::setup(intervals_ms); + async fn emit_data_doesnt_emit_when_value_not_changed() { + const INTERVAL: u64 = 42; - // The setup below is for instantiating an emitter - let map: Arc>> = - Arc::new(Mutex::new(HashMap::new())); + let (tx, mut rx) = mpsc::unbounded_channel::(); + let provider_proxy_selector_client = ProviderProxySelectorRequestSender::new(tx); + let listener_handler = tokio::spawn(async move { rx.recv().await }); - let cloud_adapter: Box = - InMemoryMockCloudAdapter::create_new().unwrap(); + let mut mock_cloud_adapter = MockCloudAdapterImpl::new(); + mock_cloud_adapter.expect_send_to_cloud().never(); - let entity_map: Arc>>> = - Arc::new(Mutex::new(HashMap::new())); + let mut uut = Emitter { + signals: Arc::new(SignalStore::new()), + cloud_adapter: mock_cloud_adapter, + provider_proxy_selector_client, + signal_values_queue: Arc::new(SegQueue::new()), + }; - let (tx_provider_proxy_selector_request, _rx_provider_proxy_selector_request) = - mpsc::unbounded_channel::(); - let provider_proxy_selector_request_sender = Arc::new( - ProviderProxySelectorRequestSender::new(tx_provider_proxy_selector_request), - ); - let signal_values_queue: Arc> = Arc::new(SegQueue::new()); + let value = Some("foo".to_string()); + let test_signal = Signal { + value: value.clone(), + emission: Emission { + next_emission_ms: 0, + last_emitted_value: value, + policy: EmissionPolicy { + interval_ms: INTERVAL, + emit_only_if_changed: true, + ..Default::default() + }, + }, + ..Default::default() + }; - let emitter = Emitter::new( - map, - cloud_adapter, - entity_map, - provider_proxy_selector_request_sender, - signal_values_queue, - ); + let result = uut.emit_data(vec![test_signal]).await; + let listener_result = listener_handler.await; - for _ in 0..10 { - let mut signal_values_map = HashMap::new(); - let emissions_with_min_interval = emitter - .emit_signals( - &mut emitter_fixture.emissions_map, - &mut signal_values_map, - &mut emitter_fixture.digital_twin_map_entries, - &mut emitter_fixture.min_interval_ms, - ) - .await - .unwrap(); - let time_sleep_duration = &emitter_fixture.min_interval_ms; - - // Signal emitting scenarios - match time_index { - // Initially we send all the signals and sleep for 4 ms - 0 => { - assert_eq!(emissions_with_min_interval.len(), 3); - assert_eq!(time_sleep_duration.unwrap(), 4) - } - // Check if we're emitting 1 signal and that we're only sleeping for 3ms - 4 | 9 => { - assert_eq!(emissions_with_min_interval.len(), 1); - assert_eq!(time_sleep_duration.unwrap(), 3); - } - // Check if we're emitting 1 signal and that we're only sleeping for 1ms - 7 | 8 | 20 => { - assert_eq!(emissions_with_min_interval.len(), 1); - assert_eq!(time_sleep_duration.unwrap(), 1); - } - // Check if we're emitting 1 signal and that we're only sleeping for 2ms - // For time_index 12, 14, 16, 18 - (12..=18) if time_index % 2 == 0 => { - assert_eq!(emissions_with_min_interval.len(), 1); - assert_eq!(time_sleep_duration.unwrap(), 2); - } - _ => {} - } - // Simulate sleep - time_index += time_sleep_duration.unwrap(); - } + uut.cloud_adapter.checkpoint(); + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), INTERVAL); + assert!(listener_result.is_ok()); + let listener_result = listener_result.unwrap(); + assert!(listener_result.is_some()); } - #[test] - fn all_signals_emit_on_change_value_change_test() { - let intervals_ms = vec![4, 7, 9, 2, 8]; - let mut digital_twin_map_entries = - fixture::EmitterFixture::setup(intervals_ms).digital_twin_map_entries; + #[tokio::test] + async fn emit_data_emits_when_value_changed() { + const INTERVAL: u64 = 42; + + let (tx, mut rx) = mpsc::unbounded_channel::(); + let provider_proxy_selector_client = ProviderProxySelectorRequestSender::new(tx); + let listener_handler = tokio::spawn(async move { rx.recv().await }); + + let mut mock_cloud_adapter = MockCloudAdapterImpl::new(); + mock_cloud_adapter + .expect_send_to_cloud() + .once() + .returning(|_| Ok(CloudMessageResponse {})); + + let mut uut = Emitter { + signals: Arc::new(SignalStore::new()), + cloud_adapter: mock_cloud_adapter, + provider_proxy_selector_client, + signal_values_queue: Arc::new(SegQueue::new()), + }; + + let test_signal = Signal { + value: Some("foo".to_string()), + emission: Emission { + next_emission_ms: 0, + last_emitted_value: Some("bar".to_string()), + policy: EmissionPolicy { + interval_ms: INTERVAL, + emit_only_if_changed: true, + ..Default::default() + }, + }, + ..Default::default() + }; - // Set all entries to emit on change - for (_, dt_map_entry) in digital_twin_map_entries.iter_mut() { - fixture::EmitterFixture::set_digital_twin_map_entry_to_emit_on_change(dt_map_entry); - assert!(dt_map_entry.emit_on_change); - } + let result = uut.emit_data(vec![test_signal]).await; + let listener_result = listener_handler.await; - let mut emissions: HashMap = HashMap::new(); - for (signal_id, _) in digital_twin_map_entries.iter_mut() { - let emission_payload = EmissionPayload { - time_ms_left: 0, - has_signal_been_emitted: false, - previous_signal_value: None, - did_signal_value_change: false, - }; - emissions.insert(signal_id.clone(), emission_payload); - assert!( - Emitter::set_emission_has_signal_been_emitted(&mut emissions, signal_id, true) - .is_ok() - ); - let payload = emissions.get(signal_id).unwrap(); - assert!(payload.has_signal_been_emitted); - } + uut.cloud_adapter.checkpoint(); - // Test to see if signals with emit on change policy are correctly handled - for (signal_id, dt_map_entry) in digital_twin_map_entries.iter() { - assert!(!Emitter::should_emit_signal_for_emit_on_change( - dt_map_entry, - &mut emissions - )); - let emission_payload = emissions.get_mut(&signal_id.clone()).unwrap(); - emission_payload.did_signal_value_change = true; - assert!(Emitter::should_emit_signal_for_emit_on_change( - dt_map_entry, - &mut emissions - )); - } + assert!(result.is_ok()); + assert_eq!(result.unwrap(), INTERVAL); + assert!(listener_result.is_ok()); + let listener_result = listener_result.unwrap(); + assert!(listener_result.is_some()); } - #[test] - fn some_signal_emit_on_change_value_change_test() { - // Initial setup - let intervals_ms = vec![4, 7, 9, 2, 8]; - let mut digital_twin_map_entries = - fixture::EmitterFixture::setup(intervals_ms).digital_twin_map_entries; - - for (counter, (_, dt_map_entry)) in digital_twin_map_entries.iter_mut().enumerate() { - // Set every even counter number to emit on change - if counter % 2 == 0 { - fixture::EmitterFixture::set_digital_twin_map_entry_to_emit_on_change(dt_map_entry); - assert!(dt_map_entry.emit_on_change); - } - } + #[tokio::test] + async fn emit_data_emits_when_last_value_empty() { + const INTERVAL: u64 = 42; + + let (tx, mut rx) = mpsc::unbounded_channel::(); + let provider_proxy_selector_client = ProviderProxySelectorRequestSender::new(tx); + let listener_handler = tokio::spawn(async move { rx.recv().await }); + + let mut mock_cloud_adapter = MockCloudAdapterImpl::new(); + mock_cloud_adapter + .expect_send_to_cloud() + .once() + .returning(|_| Ok(CloudMessageResponse {})); + + let mut uut = Emitter { + signals: Arc::new(SignalStore::new()), + cloud_adapter: mock_cloud_adapter, + provider_proxy_selector_client, + signal_values_queue: Arc::new(SegQueue::new()), + }; - let mut emissions: HashMap = HashMap::new(); - for (signal_id, dt_map_entry) in digital_twin_map_entries.iter_mut() { - let emission_payload = EmissionPayload { - time_ms_left: 0, - has_signal_been_emitted: dt_map_entry.emit_on_change, - previous_signal_value: None, - did_signal_value_change: false, - }; - emissions.insert(signal_id.clone(), emission_payload); - assert!( - Emitter::set_emission_has_signal_been_emitted(&mut emissions, signal_id, true) - .is_ok() - ); - let payload = emissions.get(signal_id).unwrap(); - assert!(payload.has_signal_been_emitted); - } + let test_signal = Signal { + value: Some("foo".to_string()), + emission: Emission { + next_emission_ms: 0, + last_emitted_value: None, + policy: EmissionPolicy { + interval_ms: INTERVAL, + emit_only_if_changed: true, + ..Default::default() + }, + }, + ..Default::default() + }; - // Test to see signals are correctly handled - for (counter, (signal_id, dt_map_entry)) in digital_twin_map_entries.iter().enumerate() { - if counter % 2 != 0 { - assert!(Emitter::should_emit_signal_for_emit_on_change( - dt_map_entry, - &mut emissions - )); - } else { - assert!(!Emitter::should_emit_signal_for_emit_on_change( - dt_map_entry, - &mut emissions - )); - } - let emission_payload = emissions.get_mut(&signal_id.clone()).unwrap(); - emission_payload.did_signal_value_change = true; - assert!(Emitter::should_emit_signal_for_emit_on_change( - dt_map_entry, - &mut emissions - )); - } + let result = uut.emit_data(vec![test_signal]).await; + let listener_result = listener_handler.await; + + uut.cloud_adapter.checkpoint(); + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), INTERVAL); + assert!(listener_result.is_ok()); + let listener_result = listener_result.unwrap(); + assert!(listener_result.is_some()); } - #[test] - fn signal_emit_on_change_no_value_change_err_expect_test() { - let intervals_ms = vec![4, 7, 9, 2, 8]; + #[tokio::test] + async fn cloud_adapter_error_doesnt_prevent_further_emission_attempts() { + let (tx, _) = mpsc::unbounded_channel::(); + let provider_proxy_selector_client = ProviderProxySelectorRequestSender::new(tx); + + let mut mock_cloud_adapter = MockCloudAdapterImpl::new(); + mock_cloud_adapter + .expect_send_to_cloud() + .times(2) + .returning(|_| Err(CloudAdapterErrorKind::Unknown.into())); + + let mut uut = Emitter { + signals: Arc::new(SignalStore::new()), + cloud_adapter: mock_cloud_adapter, + provider_proxy_selector_client, + signal_values_queue: Arc::new(SegQueue::new()), + }; - let mut digital_twin_map_entries = - fixture::EmitterFixture::setup(intervals_ms).digital_twin_map_entries; + let test_signal = Signal { + value: Some("foo".to_string()), + ..Default::default() + }; - for (_, dt_map_entry) in digital_twin_map_entries.iter_mut() { - fixture::EmitterFixture::set_digital_twin_map_entry_to_emit_on_change(dt_map_entry); - } + let result = uut.emit_data(vec![test_signal.clone(), test_signal]).await; - let mut emissions: HashMap = HashMap::new(); - for (signal_id, dt_map_entry) in digital_twin_map_entries.iter_mut() { - let emission_payload = EmissionPayload { - time_ms_left: 0, - has_signal_been_emitted: dt_map_entry.emit_on_change, - previous_signal_value: None, - did_signal_value_change: false, - }; - emissions.insert(signal_id.clone(), emission_payload); - assert!( - Emitter::set_emission_has_signal_been_emitted(&mut emissions, signal_id, true) - .is_ok() - ); - let payload = emissions.get(signal_id).unwrap(); - assert!(payload.has_signal_been_emitted); - } + uut.cloud_adapter.checkpoint(); - for (_, dt_map_entry) in digital_twin_map_entries.iter() { - assert!(!Emitter::should_emit_signal_for_emit_on_change( - dt_map_entry, - &mut emissions - )); - } + assert!(result.is_ok()); } #[tokio::test] - async fn emit_multiple_signals_on_change_test() { - let intervals_ms = vec![4, 7, 9]; - let mut time_index = 0; - let mut emitter_fixture = fixture::EmitterFixture::setup(intervals_ms); - - // The setup below is for instantiating an emitter - let map: Arc>> = - Arc::new(Mutex::new(HashMap::new())); - - let cloud_adapter: Box = - InMemoryMockCloudAdapter::create_new().unwrap(); - let entity_map: Arc>>> = - Arc::new(Mutex::new(HashMap::new())); - - let (tx_provider_proxy_selector_request, _rx_provider_proxy_selector_request) = - mpsc::unbounded_channel::(); - let provider_proxy_selector_request_sender = Arc::new( - ProviderProxySelectorRequestSender::new(tx_provider_proxy_selector_request), - ); - let signal_values_queue: Arc> = Arc::new(SegQueue::new()); + async fn send_to_cloud_updates_signal_store() { + const ID: &str = "testid"; + const INTERVAL: u64 = 42; + + let (tx, _) = mpsc::unbounded_channel::(); + let provider_proxy_selector_client = ProviderProxySelectorRequestSender::new(tx); + + let mut mock_cloud_adapter = MockCloudAdapterImpl::new(); + mock_cloud_adapter + .expect_send_to_cloud() + .returning(|_| Ok(CloudMessageResponse {})); + + let test_signal = Signal { + id: ID.to_string(), + value: Some("foo".to_string()), + emission: Emission { + policy: EmissionPolicy { + interval_ms: INTERVAL, + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }; - let emitter = Emitter::new( - map, - cloud_adapter, - entity_map, - provider_proxy_selector_request_sender, - signal_values_queue, - ); + let signals = SignalStore::new(); + signals.sync([test_signal.clone()].into_iter()); - let first_map_entry = emitter_fixture - .digital_twin_map_entries - .entry(String::from("test_0")) - .or_default(); - fixture::EmitterFixture::set_digital_twin_map_entry_to_emit_on_change(first_map_entry); - assert!(Emitter::set_emission_has_signal_been_emitted( - &mut emitter_fixture.emissions_map, - "test_0", - true - ) - .is_ok()); - for _ in 0..10 { - let mut signal_values_map = HashMap::new(); - let emissions_with_min_interval = emitter - .emit_signals( - &mut emitter_fixture.emissions_map, - &mut signal_values_map, - &mut emitter_fixture.digital_twin_map_entries, - &mut emitter_fixture.min_interval_ms, - ) - .await - .unwrap(); - let time_sleep_duration = &emitter_fixture.min_interval_ms; - - // Signal emitting scenarios - match time_index { - // Initially we send all the signals and sleep for 4 ms - 0 => { - assert_eq!(emissions_with_min_interval.len(), 3); - assert_eq!(time_sleep_duration.unwrap(), 4) - } - // Check if we're emitting 1 signal and that we're only sleeping for 3ms - 4 | 9 => { - if time_index == 4 { - assert!(emitter_fixture.check_signal_time_left("test_1", 3)) - }; - assert_eq!(emissions_with_min_interval.len(), 1); - assert_eq!(time_sleep_duration.unwrap(), 3); - } - // Check if we're emitting 1 signal and that we're only sleeping for 1ms - 7 | 8 | 20 => { - assert_eq!(emissions_with_min_interval.len(), 1); - assert_eq!(time_sleep_duration.unwrap(), 1); - } - // Check if we're emitting 1 signal and that we're only sleeping for 2ms - // For time_index 12, 14, 16, 18 - (12..=18) if time_index % 2 == 0 => { - assert_eq!(emissions_with_min_interval.len(), 1); - assert_eq!(time_sleep_duration.unwrap(), 2); - } - _ => {} - } - // Simulate sleep - time_index += time_sleep_duration.unwrap(); - } + let uut = Emitter { + signals: Arc::new(signals), + cloud_adapter: mock_cloud_adapter, + provider_proxy_selector_client, + signal_values_queue: Arc::new(SegQueue::new()), + }; + + let result = uut.send_to_cloud(test_signal).await; + + assert!(result.is_ok()); + + // Ideally the signal store should be mockable so we can just verify call count + let signal = uut.signals.get(&ID.to_string()); + assert!(signal.is_some()); + let signal = signal.unwrap(); + assert!(signal.emission.last_emitted_value.is_some()); + assert_eq!(signal.emission.next_emission_ms, INTERVAL); } } diff --git a/freyja/src/main.rs b/freyja/src/main.rs index f8208e59..ab18759c 100644 --- a/freyja/src/main.rs +++ b/freyja/src/main.rs @@ -5,13 +5,7 @@ mod cartographer; mod emitter; -use std::{ - collections::HashMap, - env, - str::FromStr, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{collections::HashMap, env, str::FromStr, sync::Arc, time::Duration}; use crossbeam::queue::SegQueue; use env_logger::Target; @@ -20,17 +14,18 @@ use tokio::sync::mpsc; use cartographer::Cartographer; use emitter::Emitter; -use freyja_contracts::provider_proxy_request::{ - ProviderProxySelectorRequestKind, ProviderProxySelectorRequestSender, -}; +use freyja_common::signal_store::SignalStore; use freyja_contracts::{ - cloud_adapter::CloudAdapter, digital_twin_adapter::DigitalTwinAdapter, - digital_twin_map_entry::DigitalTwinMapEntry, entity::*, mapping_client::MappingClient, + cloud_adapter::CloudAdapter, + digital_twin_adapter::DigitalTwinAdapter, + mapping_client::MappingClient, provider_proxy::SignalValue, + provider_proxy_request::{ + ProviderProxySelectorRequestKind, ProviderProxySelectorRequestSender, + }, }; -use provider_proxy_selector::provider_proxy_selector::ProviderProxySelector; - use freyja_deps::*; +use provider_proxy_selector::provider_proxy_selector::ProviderProxySelector; #[tokio::main] async fn main() -> Result<(), Box> { @@ -62,41 +57,33 @@ async fn main() -> Result<(), Box> { .target(Target::Stdout) .init(); - // Setup shared resources - let map: Arc>> = - Arc::new(Mutex::new(HashMap::new())); - let entity_map: Arc>>> = - Arc::new(Mutex::new(HashMap::new())); - - // Setup interfaces - let dt_adapter = DigitalTwinAdapterImpl::create_new().unwrap(); - let mapping_client = MappingClientImpl::create_new().unwrap(); - let cloud_adapter: Box = - CloudAdapterImpl::create_new().unwrap(); + let signal_store = Arc::new(SignalStore::new()); + let (tx_provider_proxy_selector_request, rx_provider_proxy_selector_request) = + mpsc::unbounded_channel::(); + let provider_proxy_selector_request_sender = + ProviderProxySelectorRequestSender::new(tx_provider_proxy_selector_request); // Setup cartographer let cartographer_poll_interval = Duration::from_secs(5); - let cartographer = Cartographer::new(map.clone(), mapping_client, cartographer_poll_interval); + let cartographer = Cartographer::new( + signal_store.clone(), + MappingClientImpl::create_new().unwrap(), + DigitalTwinAdapterImpl::create_new().unwrap(), + provider_proxy_selector_request_sender.clone(), + cartographer_poll_interval, + ); // Setup emitter let signal_values_queue: Arc> = Arc::new(SegQueue::new()); - let (tx_provider_proxy_selector_request, rx_provider_proxy_selector_request) = - mpsc::unbounded_channel::(); - let provider_proxy_selector_request_sender = Arc::new(ProviderProxySelectorRequestSender::new( - tx_provider_proxy_selector_request, - )); - let emitter = Emitter::new( - map, - cloud_adapter, - entity_map.clone(), + signal_store.clone(), + CloudAdapterImpl::create_new().unwrap(), provider_proxy_selector_request_sender.clone(), signal_values_queue.clone(), ); let provider_proxy_selector = ProviderProxySelector::new(); tokio::select! { - Err(e) = dt_adapter.run(entity_map, Duration::from_secs(5), provider_proxy_selector_request_sender) => { println!("[main] digital twin adapter terminated with error {e:?}"); Err(e)? }, Err(e) = cartographer.run() => { println!("[main] cartographer terminated with error {e:?}"); Err(e) }, Err(e) = emitter.run() => { println!("[main] emitter terminated with error {e:?}"); Err(e) }, Err(e) = provider_proxy_selector.run(rx_provider_proxy_selector_request, signal_values_queue) => { Err(e)? } diff --git a/mapping_clients/in_memory_mock_mapping_client/src/in_memory_mock_mapping_client.rs b/mapping_clients/in_memory_mock_mapping_client/src/in_memory_mock_mapping_client.rs index e8bca63c..df17c143 100644 --- a/mapping_clients/in_memory_mock_mapping_client/src/in_memory_mock_mapping_client.rs +++ b/mapping_clients/in_memory_mock_mapping_client/src/in_memory_mock_mapping_client.rs @@ -53,9 +53,8 @@ impl InMemoryMockMappingClient { #[async_trait] impl MappingClient for InMemoryMockMappingClient { /// Creates a new instance of an InMemoryMockMappingClient with default settings - fn create_new() -> Result, MappingClientError> { + fn create_new() -> Result { Self::from_config_file(Path::new(env!("OUT_DIR")).join(CONFIG_FILE)) - .map(|r| Box::new(r) as _) } /// Checks for any additional work that the mapping service requires. diff --git a/mapping_clients/mock_mapping_service_client/Cargo.toml b/mapping_clients/mock_mapping_service_client/Cargo.toml index 06b710b3..c8d14aa7 100644 --- a/mapping_clients/mock_mapping_service_client/Cargo.toml +++ b/mapping_clients/mock_mapping_service_client/Cargo.toml @@ -14,4 +14,4 @@ freyja-common = { workspace = true } freyja-contracts = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } -serde_json = { workspace = true} +serde_json = { workspace = true } diff --git a/mapping_clients/mock_mapping_service_client/src/mock_mapping_service_client.rs b/mapping_clients/mock_mapping_service_client/src/mock_mapping_service_client.rs index 408fda2a..446cb68a 100644 --- a/mapping_clients/mock_mapping_service_client/src/mock_mapping_service_client.rs +++ b/mapping_clients/mock_mapping_service_client/src/mock_mapping_service_client.rs @@ -44,13 +44,13 @@ impl MockMappingServiceClient { #[async_trait] impl MappingClient for MockMappingServiceClient { /// Creates a new instance of a CloudAdapter with default settings - fn create_new() -> Result, MappingClientError> { + fn create_new() -> Result { let config_contents = fs::read_to_string(Path::new(env!("OUT_DIR")).join(CONFIG_FILE)) .map_err(MappingClientError::io)?; let config: Config = serde_json::from_str(config_contents.as_str()) .map_err(MappingClientError::deserialize)?; - Ok(Box::new(Self::from_config(config))) + Ok(Self::from_config(config)) } /// Checks for any additional work that the mapping service requires. diff --git a/provider_proxies/grpc/v1/src/grpc_provider_proxy.rs b/provider_proxies/grpc/v1/src/grpc_provider_proxy.rs index 4a92685b..cccd511c 100644 --- a/provider_proxies/grpc/v1/src/grpc_provider_proxy.rs +++ b/provider_proxies/grpc/v1/src/grpc_provider_proxy.rs @@ -19,9 +19,8 @@ use samples_protobuf_data_access::sample_grpc::v1::{ use tonic::transport::{Channel, Server}; use crate::grpc_client_impl::GRPCClientImpl; -use freyja_contracts::{ - entity::EntityID, - provider_proxy::{OperationKind, ProviderProxy, ProviderProxyError, SignalValue}, +use freyja_contracts::provider_proxy::{ + OperationKind, ProviderProxy, ProviderProxyError, SignalValue, }; const CONSUMER_ADDR: &str = "[::1]:60010"; @@ -33,7 +32,7 @@ pub struct GRPCProviderProxy { provider_client: DigitalTwinProviderClient, /// Local cache for keeping track of which entities this provider proxy contains - entity_operation_map: Arc>>, + entity_operation_map: Arc>>, /// Shared queue for all proxies to push new signal values of entities signal_values_queue: Arc>, diff --git a/provider_proxies/http_mock_provider_proxy/src/http_mock_provider_proxy.rs b/provider_proxies/http_mock_provider_proxy/src/http_mock_provider_proxy.rs index 9edf7db4..44256879 100644 --- a/provider_proxies/http_mock_provider_proxy/src/http_mock_provider_proxy.rs +++ b/provider_proxies/http_mock_provider_proxy/src/http_mock_provider_proxy.rs @@ -16,7 +16,6 @@ use reqwest::Client; use crate::config::{Settings, CALLBACK_FOR_VALUES_PATH, CONFIG_FILE}; use freyja_contracts::digital_twin_adapter::{EntityValueRequest, EntityValueResponse}; -use freyja_contracts::entity::EntityID; use freyja_contracts::provider_proxy::{ OperationKind, ProviderProxy, ProviderProxyError, SignalValue, }; @@ -39,7 +38,7 @@ pub struct HttpMockProviderProxy { client: Client, /// Local cache for keeping track of which entities this provider proxy contains - entity_operation_map: Arc>>, + entity_operation_map: Arc>>, /// Shared queue for all proxies to push new signal values signal_values_queue: Arc>, diff --git a/provider_proxies/in_memory_mock_provider_proxy/src/config.rs b/provider_proxies/in_memory_mock_provider_proxy/src/config.rs index e30f2fed..73f87ed2 100644 --- a/provider_proxies/in_memory_mock_provider_proxy/src/config.rs +++ b/provider_proxies/in_memory_mock_provider_proxy/src/config.rs @@ -4,13 +4,11 @@ use serde::{Deserialize, Serialize}; -use freyja_contracts::entity::EntityID; - /// Configuration for a entity #[derive(Clone, Debug, Serialize, Deserialize)] pub struct EntityConfig { /// The entity id - pub entity_id: EntityID, + pub entity_id: String, /// The config for the sensor values pub values: SensorValueConfig, diff --git a/provider_proxies/in_memory_mock_provider_proxy/src/in_memory_provider_proxy.rs b/provider_proxies/in_memory_mock_provider_proxy/src/in_memory_provider_proxy.rs index fc60596c..97a79abf 100644 --- a/provider_proxies/in_memory_mock_provider_proxy/src/in_memory_provider_proxy.rs +++ b/provider_proxies/in_memory_mock_provider_proxy/src/in_memory_provider_proxy.rs @@ -17,9 +17,8 @@ use crossbeam::queue::SegQueue; use log::info; use crate::config::{EntityConfig, Settings}; -use freyja_contracts::{ - entity::EntityID, - provider_proxy::{OperationKind, ProviderProxy, ProviderProxyError, SignalValue}, +use freyja_contracts::provider_proxy::{ + OperationKind, ProviderProxy, ProviderProxyError, SignalValue, }; const CONFIG_FILE: &str = "config.json"; @@ -28,10 +27,10 @@ const SUPPORTED_OPERATIONS: &[OperationKind] = &[OperationKind::Get, OperationKi #[derive(Debug)] pub struct InMemoryMockProviderProxy { /// Maps the number of calls to each provider so we can mock changing behavior - data: HashMap, + data: HashMap, /// Local cache for keeping track of which entities this provider proxy contains - entity_operation_map: Arc>>, + entity_operation_map: Arc>>, /// Shared queue for all proxies to push new signal values of entities signal_values_queue: Arc>, @@ -97,7 +96,7 @@ impl InMemoryMockProviderProxy { fn generate_signal_value( entity_id: &str, signal_values_queue: Arc>, - data: &HashMap, + data: &HashMap, ) -> Result<(), ProviderProxyError> { let (entity_config, counter) = data .get(entity_id) @@ -140,7 +139,7 @@ impl ProviderProxy for InMemoryMockProviderProxy { info!("Started an InMemoryMockProviderProxy!"); loop { - let entities_with_subscribe: Vec; + let entities_with_subscribe: Vec; { entities_with_subscribe = self diff --git a/provider_proxy_selector/src/provider_proxy_selector.rs b/provider_proxy_selector/src/provider_proxy_selector.rs index 218d9961..799d96fe 100644 --- a/provider_proxy_selector/src/provider_proxy_selector.rs +++ b/provider_proxy_selector/src/provider_proxy_selector.rs @@ -14,7 +14,7 @@ use strum_macros::{Display, EnumString}; use tokio::{sync::mpsc::UnboundedReceiver, time::Duration}; use freyja_contracts::{ - entity::{Entity, EntityID, ProviderURI}, + entity::Entity, provider_proxy::{OperationKind, ProviderProxy, ProviderProxyError, SignalValue}, provider_proxy_request::ProviderProxySelectorRequestKind, }; @@ -130,10 +130,10 @@ impl ProviderProxyKind { /// The provider proxy selector selects which provider proxy to create based on protocol and operation pub struct ProviderProxySelector { /// A map of entity uri to provider proxy - pub provider_proxies: Arc>>, + pub provider_proxies: Arc>>, /// A map of entity id to provider uri - pub entity_map: Arc>>, + pub entity_map: Arc>>, } impl ProviderProxySelector { @@ -165,15 +165,15 @@ impl ProviderProxySelector { debug!("Handling new request {:?}", message); match message { - ProviderProxySelectorRequestKind::CreateOrUpdateProviderProxy( + ProviderProxySelectorRequestKind::CreateOrUpdateProviderProxy { entity_id, - provider_uri, + uri, protocol, operation, - ) => { + } => { let entity = Entity { id: entity_id, - uri: provider_uri, + uri, name: None, description: None, operation, @@ -186,7 +186,7 @@ impl ProviderProxySelector { ) .await; } - ProviderProxySelectorRequestKind::GetEntityValue(entity_id) => { + ProviderProxySelectorRequestKind::GetEntityValue { entity_id } => { let provider_uri; { let lock = self.entity_map.lock().unwrap();