Skip to content

Commit

Permalink
feat(rust): expose hedera_subscribe via ffi
Browse files Browse the repository at this point in the history
  • Loading branch information
mehcode committed Jun 6, 2022
1 parent 0f96d88 commit ea53173
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 9 deletions.
9 changes: 9 additions & 0 deletions sdk/c/include/hedera.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,15 @@ enum HederaError hedera_schedule_id_from_string(const char *s, struct HederaSche
*/
struct HederaSigner *hedera_signer_private_key(struct HederaPrivateKey *key);

/**
* Subscribe with this request against the provided client of the Hedera network.
* On successful completion, calls `callback` with `ERROR_OK` and a `NULL` `message`.
*/
enum HederaError hedera_subscribe(const struct HederaClient *client,
const char *request,
const void *context,
void (*callback)(const void *context, enum HederaError err, const char *message));

/**
* Parse a Hedera `TokenId` from the passed string.
*/
Expand Down
2 changes: 1 addition & 1 deletion sdk/rust/src/ffi/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl<T> Callback<T> {
Self { handle, context }
}

pub(super) fn call(self, err: Error, value: T) {
pub(super) fn call(&self, err: Error, value: T) {
(self.handle)(self.context, err, value)
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/rust/src/ffi/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(crate) fn set_last_error(error: crate::Error) {
}

/// Represents any possible result from a fallible function in the Hedera SDK.
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
#[repr(C)]
pub enum Error {
Ok,
Expand Down
8 changes: 1 addition & 7 deletions sdk/rust/src/ffi/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,13 @@ use std::ffi::{c_void, CString};
use std::os::raw::c_char;
use std::ptr::null;

use once_cell::sync::Lazy;
use tokio::runtime::{self, Runtime};

use crate::ffi::callback::Callback;
use crate::ffi::error::Error;
use crate::ffi::runtime::RUNTIME;
use crate::ffi::util::cstr_from_ptr;
use crate::transaction::AnyTransaction;
use crate::{AnyMirrorQuery, AnyQuery, Client};

static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
runtime::Builder::new_multi_thread().enable_all().max_blocking_threads(8).build().unwrap()
});

thread_local! {
static EXECUTE_RESPONSE: RefCell<CString> = RefCell::new(CString::new("").unwrap());
}
Expand Down
2 changes: 2 additions & 0 deletions sdk/rust/src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ mod execute;
mod file_id;
mod private_key;
mod public_key;
mod runtime;
mod schedule_id;
mod signer;
mod subscribe;
mod token_id;
mod topic_id;
mod util;
7 changes: 7 additions & 0 deletions sdk/rust/src/ffi/runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use once_cell::sync::Lazy;
use tokio::runtime;
use tokio::runtime::Runtime;

pub(super) static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
runtime::Builder::new_multi_thread().enable_all().max_blocking_threads(8).build().unwrap()
});
67 changes: 67 additions & 0 deletions sdk/rust/src/ffi/subscribe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::cell::RefCell;
use std::ffi::{c_void, CString};
use std::os::raw::c_char;
use std::ptr::null;

use futures_util::StreamExt;

use crate::ffi::callback::Callback;
use crate::ffi::error::Error;
use crate::ffi::runtime::RUNTIME;
use crate::ffi::util::cstr_from_ptr;
use crate::{AnyMirrorQuery, Client};

thread_local! {
static SUBSCRIBE_MESSAGE: RefCell<CString> = RefCell::new(CString::new("").unwrap());
}

/// Subscribe with this request against the provided client of the Hedera network.
/// On successful completion, calls `callback` with `ERROR_OK` and a `NULL` `message`.
#[no_mangle]
pub extern "C" fn hedera_subscribe(
client: *const Client,
request: *const c_char,
context: *const c_void,
callback: extern "C" fn(context: *const c_void, err: Error, message: *const c_char),
) -> Error {
assert!(!client.is_null());

let client = unsafe { &*client };
let request = unsafe { cstr_from_ptr(request) };

let request: AnyMirrorQuery =
ffi_try!(serde_json::from_str(&request).map_err(crate::Error::request_parse));

let callback = Callback::new(context, callback);

RUNTIME.spawn(async move {
let mut stream = request.subscribe(client);

while let Some(message) = stream.next().await {
let message = message.map(|message| {
let message = serde_json::to_string(&message).unwrap();

SUBSCRIBE_MESSAGE.with(|message_text| {
*message_text.borrow_mut() = CString::new(message).unwrap();

message_text.borrow().as_ptr()
})
});

let (err, message) = match message {
Ok(message) => (Error::Ok, message),
Err(error) => (Error::new(error), null()),
};

callback.call(err, message);

if err != Error::Ok {
return;
}
}

callback.call(Error::Ok, null());
});

Error::Ok
}

0 comments on commit ea53173

Please sign in to comment.