Skip to content

Commit

Permalink
Feat: Initial implementation to load and init a WASM plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
rholshausen committed Jul 23, 2024
1 parent 2f45660 commit 821ccd7
Show file tree
Hide file tree
Showing 14 changed files with 2,937 additions and 437 deletions.
1,105 changes: 1,098 additions & 7 deletions drivers/rust/driver/Cargo.lock

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions drivers/rust/driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ exclude = [
]

[features]
default = ["datetime", "xml", "lua"]
default = ["datetime", "xml", "lua", "wasm"]
datetime = ["pact_models/datetime"] # Support for date/time matchers and expressions
xml = ["pact_models/xml"] # support for matching XML documents
lua = ["dep:mlua"] # support for plugins written in Lua
xml = ["pact_models/xml"] # Support for matching XML documents
lua = ["dep:mlua"] # Support for plugins written in Lua
wasm = ["dep:wasmtime", "dep:wasmtime-wasi"] # Support for WASM plugins

[dependencies]
anyhow = "1.0.86"
Expand Down Expand Up @@ -57,6 +58,8 @@ tonic = "0.12.0"
tracing = { version = "0.1.40", features = [ "log" ] }
tracing-core = "0.1.32"
uuid = { version = "1.10.0", features = ["v4"] }
wasmtime = { version = "23.0.1", optional = true }
wasmtime-wasi = { version = "23.0.1", optional = true }
zip = "2.1.3"

[dev-dependencies]
Expand Down
3 changes: 2 additions & 1 deletion drivers/rust/driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ pub mod verification;
pub mod repository;
pub mod download;
pub mod grpc_plugin;
#[cfg(feature = "lua")] mod lua_plugin;
#[cfg(feature = "lua")] pub(crate) mod lua_plugin;
#[cfg(feature = "wasm")] pub(crate) mod wasm_plugin;
98 changes: 62 additions & 36 deletions drivers/rust/driver/src/plugin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::plugin_models::{PactPlugin, PactPluginManifest, PluginDependency};
use crate::repository::{fetch_repository_index, USER_AGENT};
use crate::utils::versions_compatible;
use crate::verification::{InteractionVerificationData, InteractionVerificationResult};
use crate::wasm_plugin::load_wasm_plugin;

lazy_static! {
static ref PLUGIN_MANIFEST_REGISTER: Mutex<HashMap<String, PactPluginManifest>> = Mutex::new(HashMap::new());
Expand All @@ -46,41 +47,53 @@ pub async fn load_plugin<'a>(plugin: &PluginDependency) -> anyhow::Result<Arc<dy
let thread_id = thread::current().id();
debug!("Loading plugin {:?}", plugin);
trace!("Rust plugin driver version {}", option_env!("CARGO_PKG_VERSION").unwrap_or_default());
trace!("load_plugin {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
let mut inner = PLUGIN_REGISTER.lock().unwrap();
trace!("load_plugin {:?}: Got PLUGIN_REGISTER lock", thread_id);
let update_access = |plugin: &mut (dyn PactPlugin + Send + Sync)| {
debug!("Found running plugin {:?}", plugin);
plugin.update_access();
plugin.arced()
};
let result = match with_plugin_mut(plugin, &mut inner, &update_access) {
Some(plugin) => Ok(plugin),
None => {
debug!("Did not find plugin, will attempt to start it");
let manifest = match load_plugin_manifest(plugin) {
Ok(manifest) => manifest,
Err(err) => {
warn!("Could not load plugin manifest from disk, will try auto install it: {}", err);
let http_client = reqwest::ClientBuilder::new()
.user_agent(USER_AGENT)
.build()?;
let index = fetch_repository_index(&http_client, None).await?;
match index.lookup_plugin_version(&plugin.name, &plugin.version) {
Some(entry) => {
info!("Found an entry for the plugin in the plugin index, will try install that");
install_plugin_from_url(&http_client, entry.source.value().as_str()).await?

let result = {
trace!("load_plugin {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
let mut inner = PLUGIN_REGISTER.lock().unwrap();
trace!("load_plugin {:?}: Got PLUGIN_REGISTER lock", thread_id);
let update_access = |plugin: &mut (dyn PactPlugin + Send + Sync)| {
debug!("Found running plugin {:?}", plugin);
plugin.update_access();
plugin.arced()
};
let result = match with_plugin_mut(plugin, &mut inner, &update_access) {
Some(plugin) => Ok((plugin, false)),
None => {
debug!("Did not find plugin, will attempt to start it");
let manifest = match load_plugin_manifest(plugin) {
Ok(manifest) => manifest,
Err(err) => {
warn!("Could not load plugin manifest from disk, will try auto install it: {}", err);
let http_client = reqwest::ClientBuilder::new()
.user_agent(USER_AGENT)
.build()?;
let index = fetch_repository_index(&http_client, None).await?;
match index.lookup_plugin_version(&plugin.name, &plugin.version) {
Some(entry) => {
info!("Found an entry for the plugin in the plugin index, will try install that");
install_plugin_from_url(&http_client, entry.source.value().as_str()).await?
}
None => Err(err)?
}
None => Err(err)?
}
}
};
send_metrics(&manifest);
initialise_plugin(&manifest, &mut inner).await
}
};
send_metrics(&manifest);
initialise_plugin(&manifest, &mut inner).await
.map(|plugin| (plugin, true))
}
};
trace!("load_plugin {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
result
};
trace!("load_plugin {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
result

if let Ok((_, new_plugin)) = &result {
if *new_plugin {
publish_updated_catalogue();
}
}

result.map(|(plugin, _)| plugin)
}

fn lookup_plugin_inner(
Expand Down Expand Up @@ -230,7 +243,6 @@ async fn initialise_plugin<'a>(
plugin.kill();
anyhow!("Failed to send init request to the plugin - {}", err)
})?;
publish_updated_catalogue();

let arc = Arc::new(plugin);
let key = format!("{}/{}", manifest.name, manifest.version);
Expand All @@ -245,9 +257,6 @@ async fn initialise_plugin<'a>(

plugin.init()?;

// TODO: This causes a deadlock
//publish_updated_catalogue();

let arc = Arc::new(plugin);
let key = format!("{}/{}", manifest.name, manifest.version);
plugin_register.insert(key, arc.clone());
Expand All @@ -258,6 +267,23 @@ async fn initialise_plugin<'a>(
Err(anyhow!("Lua plugins are not supported (Lua feature flag is not enabled)"))
}
}
"wasm" => {
#[cfg(feature = "wasm")] {
let plugin = load_wasm_plugin(manifest)?;
debug!("Plugin loaded OK ({:?}), sending init message", plugin);

plugin.init()?;

let arc = Arc::new(plugin);
let key = format!("{}/{}", manifest.name, manifest.version);
plugin_register.insert(key, arc.clone());

Ok(arc)
}
#[cfg(not(feature = "wasm"))] {
Err(anyhow!("WASM plugins are not supported (wasm feature flag is not enabled)"))
}
}
_ => Err(anyhow!("Plugin executable type of {} is not supported", manifest.executable_type))
}
}
Expand Down
Loading

0 comments on commit 821ccd7

Please sign in to comment.