Skip to content

Commit

Permalink
decouple resource and resource inner (#86)
Browse files Browse the repository at this point in the history
* decoupled resources and watch

Signed-off-by: Jiaxiao Zhou <[email protected]>

fixing format typo

Signed-off-by: Dan Chiarlone <[email protected]>
  • Loading branch information
Mossaka authored and danbugs committed Jul 23, 2022
1 parent 9276756 commit 3e387af
Show file tree
Hide file tree
Showing 16 changed files with 406 additions and 425 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/events/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ wit-error-rs = { git = "https://github.com/danbugs/wit-error-rs", rev = "05362f1
crossbeam-utils = "0.8"
crossbeam-channel = "0.5.5"
tracing = { version = "0.1", features = ["log"] }
proc_macro_utils = { path = "../proc_macro_utils" }
uuid = { version = "1.1.2", features = ["v4"] }
93 changes: 51 additions & 42 deletions crates/events/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::{
time::{Duration, Instant},
};

use anyhow::Result;
use anyhow::{Context, Result};
use crossbeam_utils::thread;
use proc_macro_utils::Resource;

use crate::events::Error;
use crate::events::Observable as GeneratedObservable;
Expand All @@ -15,9 +16,11 @@ use events_api::{AttributesReader, Event, EventHandler, EventParam};
use runtime::{
impl_resource,
resource::{
get_table, Ctx, DataT, Linker, Resource, ResourceMap, ResourceTables, RuntimeResource,
get_table, Ctx, HostState, Linker, Resource, ResourceBuilder, ResourceMap, ResourceTables,
StateTable,
},
};
use uuid::Uuid;
use wasmtime::Store;

use crate::events::add_to_linker;
Expand All @@ -28,22 +31,41 @@ wit_error_rs::impl_from!(anyhow::Error, Error::ErrorWithDescription);
const SCHEME_NAME: &str = "events";

/// Events capability
#[derive(Default)]
#[derive(Default, Resource)]
pub struct Events {
observables: Vec<Observable>,
host_state: Option<ResourceMap>,
host_state: EventsState,
}

#[derive(Default)]
pub struct EventsState {
resource_map: ResourceMap,
event_handler: Option<Arc<Mutex<EventHandler<Ctx>>>>,
store: Option<Arc<Mutex<Store<Ctx>>>>,
}

impl EventsState {
pub fn new(resource_map: Arc<Mutex<StateTable>>) -> Self {
Self {
resource_map,
..Default::default()
}
}
}

impl_resource!(
Events,
events::EventsTables<Events>,
ResourceMap,
EventsState,
SCHEME_NAME.to_string()
);

#[derive(Clone, Debug, Default)]
pub struct EventsGuest {
observables: Vec<Observable>,
}

/// An owned observable
#[derive(Clone, Debug)]
struct Observable {
rd: String,
key: String,
Expand All @@ -70,61 +92,48 @@ impl Events {
store: Arc<Mutex<Store<Ctx>>>,
event_handler: Arc<Mutex<EventHandler<Ctx>>>,
) -> Result<()> {
self.event_handler = Some(event_handler);
self.store = Some(store);
self.host_state.event_handler = Some(event_handler);
self.host_state.store = Some(store);
Ok(())
}
}

impl Resource for Events {
fn get_inner(&self) -> &dyn std::any::Any {
unimplemented!("events will not be dynamically dispatched to a specific resource")
}

fn watch(
&mut self,
_data: &str,
_rd: &str,
_key: &str,
_sender: Arc<Mutex<Sender<Event>>>,
) -> Result<()> {
unimplemented!("events will not be listened to")
}
}

impl events::Events for Events {
type Events = ();
type Events = EventsGuest;
fn events_get(&mut self) -> Result<Self::Events, Error> {
Ok(())
Ok(Default::default())
}

fn events_listen(
&mut self,
_events: &Self::Events,
self_: &Self::Events,
ob: GeneratedObservable<'_>,
) -> Result<(), Error> {
// TODO (Joe): I can't figure out how to not deep copy the Observable here to satisfy the
// Rust lifetime rules.
let ob2 = ob.into();
self.observables.push(ob2);
Ok(())
) -> Result<Self::Events, Error> {
Uuid::parse_str(ob.rd)
.with_context(|| "internal error: failed to parse internal handle to this resource")?;
let ob = ob.into();
// FIXME: the reason I had to clone the observable is because the observable is owned by
// self_ which is not a mutable reference.
let mut observables = self_.observables.clone();
observables.push(ob);
Ok(Self::Events { observables })
}

fn events_exec(&mut self, _events: &Self::Events, duration: u64) -> Result<(), Error> {
for ob in &self.observables {
fn events_exec(&mut self, self_: &Self::Events, duration: u64) -> Result<(), Error> {
for ob in &self_.observables {
// check if observable has changed
let map = self.host_state.as_mut().unwrap();

let map = self.host_state.resource_map.clone();

let mut map = map.lock().unwrap();
let data = map.get::<String>(&ob.rd).unwrap().to_string();
let resource = &mut map.get_dynamic_mut(&ob.rd).unwrap().0;
resource.watch(&data, &ob.rd, &ob.key, ob.sender.clone())?;
let resource = map.get_mut(&ob.rd).unwrap();
resource.watch(&ob.key, ob.sender.clone())?;
}
thread::scope(|s| -> Result<()> {
let mut thread_handles = vec![];
for ob in &self.observables {
let handler = self.event_handler.as_ref().unwrap().clone();
let store = self.store.as_mut().unwrap().clone();
for ob in &self_.observables {
let handler = self.host_state.event_handler.as_ref().unwrap().clone();
let store = self.host_state.store.as_mut().unwrap().clone();
let receiver = ob.receiver.clone();
let receive_thread = s.spawn(move |_| loop {
match receiver
Expand Down
94 changes: 48 additions & 46 deletions crates/kv-azure-blob/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use azure_storage_blobs::prelude::*;
use crossbeam_channel::Sender;
use events_api::Event;
use futures::executor::block_on;
use proc_macro_utils::Resource;
use proc_macro_utils::{Resource, Watch};
use runtime::{
impl_resource,
resource::{
get_table, BasicState, Ctx, DataT, Linker, Map, Resource, ResourceTables, RuntimeResource,
get_table, BasicState, Ctx, HostState, Linker, Resource, ResourceBuilder, ResourceTables,
Watch,
},
};
use std::sync::{Arc, Mutex};
Expand All @@ -28,8 +29,7 @@ const SCHEME_NAME: &str = "azblobkv";
/// A Azure Blob Storage implementation for the kv interface
#[derive(Default, Clone, Resource)]
pub struct KvAzureBlob {
inner: Option<Arc<ContainerClient>>,
host_state: Option<BasicState>,
host_state: BasicState,
}

impl_resource!(
Expand All @@ -39,9 +39,19 @@ impl_resource!(
SCHEME_NAME.to_string()
);

impl KvAzureBlob {
/// Create a new `KvAzureBlob`
fn new(storage_account_name: &str, storage_account_key: &str, container_name: &str) -> Self {
#[derive(Default, Clone, Debug, Watch)]
pub struct KvAzureBlobInner {
container_client: Option<Arc<ContainerClient>>,
rd: String,
}

impl KvAzureBlobInner {
pub fn new(
storage_account_name: &str,
storage_account_key: &str,
container_name: &str,
rd: String,
) -> Self {
let http_client = azure_core::new_http_client();
let inner = Some(
StorageAccountClient::new_access_key(
Expand All @@ -52,45 +62,46 @@ impl KvAzureBlob {
.as_container_client(container_name),
);
Self {
inner,
host_state: None,
container_client: inner,
rd,
}
}
}

impl kv::Kv for KvAzureBlob {
type Kv = String;
type Kv = KvAzureBlobInner;
/// Construct a new `KvAzureBlob` from a container name. For example, a container name could be "my-container"
fn kv_open(&mut self, name: &str) -> Result<Self::Kv, kv::Error> {
let storage_account_name = String::from_utf8(runtime_configs::providers::get(
&self.host_state.as_ref().unwrap().secret_store,
&self.host_state.secret_store,
"AZURE_STORAGE_ACCOUNT",
&self.host_state.as_ref().unwrap().config_toml_file_path,
&self.host_state.config_toml_file_path,
)?)?;
let storage_account_key = String::from_utf8(runtime_configs::providers::get(
&self.host_state.as_ref().unwrap().secret_store,
&self.host_state.secret_store,
"AZURE_STORAGE_KEY",
&self.host_state.as_ref().unwrap().config_toml_file_path,
&self.host_state.config_toml_file_path,
)?)?;

let kv_azure_blob = KvAzureBlob::new(&storage_account_name, &storage_account_key, name);
self.inner = kv_azure_blob.inner;

let rd = Uuid::new_v4().to_string();
let cloned = self.clone(); // have to clone here because of the mutable borrow below
let mut map = Map::lock(&mut self.host_state.as_mut().unwrap().resource_map)?;
map.set(rd.clone(), (Box::new(cloned), None));
Ok(rd)
let kv_azure_blob_guest = KvAzureBlobInner::new(
&storage_account_name,
&storage_account_key,
name,
rd.clone(),
);

self.host_state
.resource_map
.lock()
.unwrap()
.set(rd, Box::new(kv_azure_blob_guest.clone()));
Ok(kv_azure_blob_guest)
}

/// Output the value of a set key
fn kv_get(&mut self, self_: &Self::Kv, key: &str) -> Result<kv::PayloadResult, kv::Error> {
Uuid::parse_str(self_).with_context(|| {
"internal kv::error: failed to parse internal handle to this resource"
})?;

let map = Map::lock(&mut self.host_state.as_mut().unwrap().resource_map)?;
let inner = map.get::<Arc<ContainerClient>>(self_)?;
fn kv_get(&mut self, self_: &Self::Kv, key: &str) -> Result<PayloadResult, Error> {
let inner = self_.container_client.as_ref().unwrap();
let blob_client = inner.as_blob_client(key);
let res = block_on(azure::get(blob_client))
.with_context(|| format!("failed to get value for key {}", key))?;
Expand All @@ -102,14 +113,10 @@ impl kv::Kv for KvAzureBlob {
&mut self,
self_: &Self::Kv,
key: &str,
value: kv::PayloadParam<'_>,
) -> Result<(), kv::Error> {
Uuid::parse_str(self_).with_context(|| {
"internal kv::error: failed to parse internal handle to this resource"
})?;

let map = Map::lock(&mut self.host_state.as_mut().unwrap().resource_map)?;
let inner = map.get::<Arc<ContainerClient>>(self_)?;
value: PayloadParam<'_>,
) -> Result<(), Error> {
let inner = self_.container_client.as_ref().unwrap();

let blob_client = inner.as_blob_client(key);
let value = Vec::from(value);
block_on(azure::set(blob_client, value))
Expand All @@ -118,22 +125,17 @@ impl kv::Kv for KvAzureBlob {
}

/// Delete a key-value pair
fn kv_delete(&mut self, self_: &Self::Kv, key: &str) -> Result<(), kv::Error> {
Uuid::parse_str(self_).with_context(|| {
"internal kv::error: failed to parse internal handle to this resource"
})?;

let map = Map::lock(&mut self.host_state.as_mut().unwrap().resource_map)?;
let inner = map.get::<Arc<ContainerClient>>(self_)?;

fn kv_delete(&mut self, self_: &Self::Kv, key: &str) -> Result<(), Error> {
let inner = self_.container_client.as_ref().unwrap();
let blob_client = inner.as_blob_client(key);
block_on(azure::delete(blob_client)).with_context(|| "failed to delete key's value")?;
Ok(())
}

fn kv_watch(&mut self, self_: &Self::Kv, key: &str) -> Result<Observable, kv::Error> {
/// Watch for changes to a key-value pair
fn kv_watch(&mut self, self_: &Self::Kv, key: &str) -> Result<Observable, Error> {
Ok(Observable {
rd: self_.to_string(),
rd: self_.rd.clone(),
key: key.to_string(),
})
}
Expand Down
Loading

0 comments on commit 3e387af

Please sign in to comment.