diff --git a/Cargo.toml b/Cargo.toml index a9bd7fe..f215545 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nacos-sdk-rust-binding-py" -version = "0.3.5" +version = "0.3.6-ALPHA" edition = "2021" license = "Apache-2.0" publish = false @@ -18,10 +18,13 @@ crate-type = ["cdylib"] doc = false [dependencies] -pyo3 = "0.18" +pyo3 = "0.20" +pyo3-asyncio = { version = "0.20", features = ["tokio-runtime"] } +# for block api +futures = { version = "0.3", default-features = false, features = [] } -nacos-sdk = { version = "0.3.5", features = ["default"] } -#nacos-sdk = { git = "https://github.com/nacos-group/nacos-sdk-rust.git", features = ["default"] } +nacos-sdk = { version = "0.3.5", features = ["async"] } +#nacos-sdk = { git = "https://github.com/nacos-group/nacos-sdk-rust.git", features = ["async"] } tracing-subscriber = { version = "0.3", features = ["default"] } #tracing-subscriber = { version = "0.3", features = ["env-filter", "local-time"] } # occur `` diff --git a/examples/async_config.py b/examples/async_config.py new file mode 100644 index 0000000..f9f806a --- /dev/null +++ b/examples/async_config.py @@ -0,0 +1,66 @@ +#!/usr/bin/python3 + +import asyncio +import nacos_sdk_rust_binding_py as nacos + +client_options = nacos.ClientOptions("0.0.0.0:8848", "love", "simple_app_py", "nacos", "nacos") + +# 一般大部分情况下,应用下仅需一个客户端,而且需要长期持有直至应用停止。 +# 因为它内部会初始化与服务端的长链接,后续的数据交互及服务变更等订阅,都是实时地通过长链接告知客户端的。 +config_client = nacos.AsyncNacosConfigClient(client_options) + + +# 自定义配置监听的函数,接受的参数为 `nacos.NacosConfigResponse` +def listen_config(config_resp: nacos.NacosConfigResponse): + print(f"listen_config,config_resp={str(config_resp)}") + print(f"listen_config,config_resp.content={config_resp.content}") + + +async def main(): + await asyncio.sleep(1) + + data_id = "todo-dataid" + group = "LOVE" + publish_content = "test-content" + + # 添加配置监听(对目标 data_id, group 配置变化的监听) + await config_client.add_listener(data_id, group, listen_config) + + # 推送配置 + await config_client.publish_config(data_id, group, publish_content) + + await asyncio.sleep(1) + + # 获取配置,返回值为 `nacos.NacosConfigResponse` + config_content_resp = await config_client.get_config_resp(data_id, group) + + # 获取配置,返回值为 content: String + get_config_content = await config_client.get_config(data_id, group) + + assert get_config_content == publish_content + assert config_content_resp.content == publish_content + + print(f"get_config_content={get_config_content}") + print(f"config_content_resp={str(config_content_resp)},resp_content={config_content_resp.content}") + + await asyncio.sleep(1) + + # 推送配置,使配置监听函数被调用 + await config_client.publish_config(data_id, group, "publish_content for listen_config") + + # 等待一段时间供用户查看 Nacos 服务器上被监听的配置 + await asyncio.sleep(300) + + # 删除配置 + await config_client.remove_config(data_id, group) + + # 获取已删除的配置,会抛出异常 + try: + get_config_content_removed = await config_client.get_config(data_id, group) + except RuntimeError: + print("config already be removed.") + + await asyncio.sleep(10) + +# 运行主任务 +asyncio.run(main()) diff --git a/examples/async_naming.py b/examples/async_naming.py new file mode 100644 index 0000000..7612143 --- /dev/null +++ b/examples/async_naming.py @@ -0,0 +1,55 @@ +#!/usr/bin/python3 + +import asyncio +import nacos_sdk_rust_binding_py as nacos + +client_options = nacos.ClientOptions("0.0.0.0:8848", "love", "simple_app_py", "nacos", "nacos") + +# 一般大部分情况下,应用下仅需一个客户端,而且需要长期持有直至应用停止。 +# 因为它内部会初始化与服务端的长链接,后续的数据交互及服务变更等订阅,都是实时地通过长链接告知客户端的。 +naming_client = nacos.AsyncNacosNamingClient(client_options) + + +# 自定义服务订阅函数,接受的参数为 `nacos.NacosConfigResponse` +def subscribe_instances(instances: [nacos.NacosServiceInstance]): + print(f"subscribe_instances,instances={str(instances)}") + for ins in instances: + print(f"subscribe_instances,instances[x].ip={ins.ip}") + + +async def main(): + await asyncio.sleep(1) + + service_name = "todo-service-name" + group = "dev" + service_instance = nacos.NacosServiceInstance("127.0.0.1", 8080) + + # 添加服务订阅(对目标 service_name, group 的服务实例变化的监听) + await naming_client.subscribe(service_name, group, None, subscribe_instances) + + await asyncio.sleep(1) + + # 注册服务实例 + await naming_client.register_instance(service_name, group, service_instance) + + await asyncio.sleep(1) + + # 获取服务实例列表 + get_instances = await naming_client.get_all_instances(service_name, group) + + assert len(get_instances) > 0 + assert get_instances[0].ip == service_instance.ip + + print(f"get_instances={str(get_instances)}") + for i in get_instances: + print(f"get_instances[x].ip={i.ip}") + + # 批量注册服务实例,可使前面的配置监听函数被调用 + service_instance2 = nacos.NacosServiceInstance("127.0.0.2", 8080) + await naming_client.batch_register_instance(service_name, group, [service_instance, service_instance2]) + + # 等待一段时间 + await asyncio.sleep(300) + +# 运行主任务 +asyncio.run(main()) diff --git a/examples/config.py b/examples/config.py index 07b4cc3..87e0923 100644 --- a/examples/config.py +++ b/examples/config.py @@ -5,6 +5,8 @@ client_options = nacos.ClientOptions("0.0.0.0:8848", "love", "simple_app_py", "nacos", "nacos") +# 一般大部分情况下,应用下仅需一个客户端,而且需要长期持有直至应用停止。 +# 因为它内部会初始化与服务端的长链接,后续的数据交互及服务变更等订阅,都是实时地通过长链接告知客户端的。 config_client = nacos.NacosConfigClient(client_options) time.sleep(1) diff --git a/examples/naming.py b/examples/naming.py index 5e3a674..2530ca2 100644 --- a/examples/naming.py +++ b/examples/naming.py @@ -5,6 +5,8 @@ client_options = nacos.ClientOptions("0.0.0.0:8848", "love", "simple_app_py", "nacos", "nacos") +# 一般大部分情况下,应用下仅需一个客户端,而且需要长期持有直至应用停止。 +# 因为它内部会初始化与服务端的长链接,后续的数据交互及服务变更等订阅,都是实时地通过长链接告知客户端的。 naming_client = nacos.NacosNamingClient(client_options) time.sleep(1) diff --git a/src/async_config.rs b/src/async_config.rs new file mode 100644 index 0000000..808c493 --- /dev/null +++ b/src/async_config.rs @@ -0,0 +1,170 @@ +#![deny(clippy::all)] + +use pyo3::exceptions::{PyRuntimeError, PyValueError}; +use pyo3::{pyclass, pymethods, PyAny, PyErr, PyResult, Python, ToPyObject}; +use pyo3_asyncio::tokio::future_into_py; + +use std::sync::Arc; + +use crate::config::{transfer_conf_resp, NacosConfigChangeListener}; + +/// Async Client api of Nacos Config. +#[pyclass(module = "nacos_sdk_rust_binding_py")] +pub struct AsyncNacosConfigClient { + inner: Arc, +} + +#[pymethods] +impl AsyncNacosConfigClient { + /// Build a Config Client. + #[new] + pub fn new(client_options: crate::ClientOptions) -> PyResult { + // print to console or file + let _ = crate::init_logger(); + + let props = nacos_sdk::api::props::ClientProps::new() + .server_addr(client_options.server_addr) + .namespace(client_options.namespace) + .app_name( + client_options + .app_name + .unwrap_or(nacos_sdk::api::constants::UNKNOWN.to_string()), + ); + + // need enable_auth_plugin_http with username & password + let is_enable_auth = client_options.username.is_some() && client_options.password.is_some(); + + let props = if is_enable_auth { + props + .auth_username(client_options.username.unwrap()) + .auth_password(client_options.password.unwrap()) + } else { + props + }; + + let config_service_builder = if is_enable_auth { + nacos_sdk::api::config::ConfigServiceBuilder::new(props).enable_auth_plugin_http() + } else { + nacos_sdk::api::config::ConfigServiceBuilder::new(props) + }; + + let config_service = config_service_builder + .build() + .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?; + + Ok(Self { + inner: Arc::new(config_service), + }) + } + + /// Get config's content. + /// If it fails, pay attention to err + pub fn get_config<'p>( + &self, + py: Python<'p>, + data_id: String, + group: String, + ) -> PyResult<&'p PyAny> { + let this = self.inner.clone(); + future_into_py(py, async move { + let config_resp = this + .get_config(data_id, group) + .await + .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?; + Ok(transfer_conf_resp(config_resp).content) + }) + } + + /// Get NacosConfigResponse. + /// If it fails, pay attention to err + pub fn get_config_resp<'p>( + &self, + py: Python<'p>, + data_id: String, + group: String, + ) -> PyResult<&'p PyAny> { + let this = self.inner.clone(); + future_into_py(py, async move { + let config_resp = this + .get_config(data_id, group) + .await + .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?; + Ok(transfer_conf_resp(config_resp)) + }) + } + + /// Publish config. + /// If it fails, pay attention to err + pub fn publish_config<'p>( + &self, + py: Python<'p>, + data_id: String, + group: String, + content: String, + ) -> PyResult<&'p PyAny> { + let this = self.inner.clone(); + future_into_py(py, async move { + this.publish_config(data_id, group, content, None) + .await + .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err))) + }) + } + + /// Remove config. + /// If it fails, pay attention to err + pub fn remove_config<'p>( + &self, + py: Python<'p>, + data_id: String, + group: String, + ) -> PyResult<&'p PyAny> { + let this = self.inner.clone(); + future_into_py(py, async move { + this.remove_config(data_id, group) + .await + .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err))) + }) + } + + /// Add NacosConfigChangeListener callback func, which listen the config change. + /// If it fails, pay attention to err + #[pyo3(signature = (data_id, group, listener))] + pub fn add_listener<'p>( + &self, + py: Python<'p>, + data_id: String, + group: String, + listener: &PyAny, // PyFunction arg: + ) -> PyResult<&'p PyAny> { + if !listener.is_callable() { + return Err(PyErr::new::( + "Arg `listener` must be a callable", + )); + } + let listen_wrap = Arc::new(NacosConfigChangeListener { + func: Arc::new(listener.to_object(py)), + }); + let this = self.inner.clone(); + future_into_py(py, async move { + this.add_listener(data_id, group, listen_wrap) + .await + .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?; + Ok(()) + }) + } + + /// Remove NacosConfigChangeListener callback func, but noop.... + /// The logic is not implemented internally, and only APIs are provided as compatibility. + /// Users maybe do not need it? Not removing the listener is not a big problem, Sorry! + #[pyo3(signature = (data_id, group, listener))] + #[allow(unused_variables)] + pub fn remove_listener<'p>( + &self, + py: Python<'p>, + data_id: String, + group: String, + listener: &PyAny, // PyFunction arg: + ) -> PyResult<&'p PyAny> { + future_into_py(py, async { Ok(()) }) + } +} diff --git a/src/async_naming.rs b/src/async_naming.rs new file mode 100644 index 0000000..c541310 --- /dev/null +++ b/src/async_naming.rs @@ -0,0 +1,263 @@ +#![deny(clippy::all)] + +use pyo3::exceptions::{PyRuntimeError, PyValueError}; +use pyo3::{pyclass, pymethods, PyAny, PyErr, PyResult, Python, ToPyObject}; +use pyo3_asyncio::tokio::future_into_py; + +use std::sync::Arc; + +use crate::naming::{ + transfer_ffi_instance_to_rust, transfer_rust_instance_to_ffi, NacosNamingEventListener, + NacosServiceInstance, +}; + +/// Async Client api of Nacos Naming. +#[pyclass(module = "nacos_sdk_rust_binding_py")] +pub struct AsyncNacosNamingClient { + inner: Arc, +} + +#[pymethods] +impl AsyncNacosNamingClient { + /// Build a Naming Client. + #[new] + pub fn new(client_options: crate::ClientOptions) -> PyResult { + // print to console or file + let _ = crate::init_logger(); + + let props = nacos_sdk::api::props::ClientProps::new() + .server_addr(client_options.server_addr) + .namespace(client_options.namespace) + .app_name( + client_options + .app_name + .unwrap_or(nacos_sdk::api::constants::UNKNOWN.to_string()), + ) + .naming_push_empty_protection( + client_options.naming_push_empty_protection.unwrap_or(true), + ) + .naming_load_cache_at_start(client_options.naming_load_cache_at_start.unwrap_or(false)); + + // need enable_auth_plugin_http with username & password + let is_enable_auth = client_options.username.is_some() && client_options.password.is_some(); + + let props = if is_enable_auth { + props + .auth_username(client_options.username.unwrap()) + .auth_password(client_options.password.unwrap()) + } else { + props + }; + + let naming_service_builder = if is_enable_auth { + nacos_sdk::api::naming::NamingServiceBuilder::new(props).enable_auth_plugin_http() + } else { + nacos_sdk::api::naming::NamingServiceBuilder::new(props) + }; + + let naming_service = naming_service_builder + .build() + .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?; + + Ok(Self { + inner: Arc::new(naming_service), + }) + } + + /// Register instance. + /// If it fails, pay attention to err + pub fn register_instance<'p>( + &self, + py: Python<'p>, + service_name: String, + group: String, + service_instance: NacosServiceInstance, + ) -> PyResult<&'p PyAny> { + let this = self.inner.clone(); + future_into_py(py, async move { + this.register_instance( + service_name, + Some(group), + transfer_ffi_instance_to_rust(&service_instance), + ) + .await + .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err))) + }) + } + + /// Deregister instance. + /// If it fails, pay attention to err + pub fn deregister_instance<'p>( + &self, + py: Python<'p>, + service_name: String, + group: String, + service_instance: NacosServiceInstance, + ) -> PyResult<&'p PyAny> { + let this = self.inner.clone(); + future_into_py(py, async move { + this.deregister_instance( + service_name, + Some(group), + transfer_ffi_instance_to_rust(&service_instance), + ) + .await + .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err))) + }) + } + + /// Batch register instance, improve interaction efficiency. + /// If it fails, pay attention to err + pub fn batch_register_instance<'p>( + &self, + py: Python<'p>, + service_name: String, + group: String, + service_instances: Vec, + ) -> PyResult<&'p PyAny> { + let this = self.inner.clone(); + future_into_py(py, async move { + let rust_instances = service_instances + .iter() + .map(transfer_ffi_instance_to_rust) + .collect(); + this.batch_register_instance(service_name, Some(group), rust_instances) + .await + .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err))) + }) + } + + /// Get all instances by service and group. default cluster=[], subscribe=true. + /// If it fails, pay attention to err + pub fn get_all_instances<'p>( + &self, + py: Python<'p>, + service_name: String, + group: String, + clusters: Option>, + subscribe: Option, + ) -> PyResult<&'p PyAny> { + let this = self.inner.clone(); + future_into_py(py, async move { + let rust_instances = this + .get_all_instances( + service_name, + Some(group), + clusters.unwrap_or_default(), + subscribe.unwrap_or(true), + ) + .await + .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?; + + Ok(rust_instances + .iter() + .map(transfer_rust_instance_to_ffi) + .collect::>()) + }) + } + + /// Select instances whether healthy or not. default cluster=[], subscribe=true, healthy=true. + /// If it fails, pay attention to err + pub fn select_instances<'p>( + &self, + py: Python<'p>, + service_name: String, + group: String, + clusters: Option>, + subscribe: Option, + healthy: Option, + ) -> PyResult<&'p PyAny> { + let this = self.inner.clone(); + future_into_py(py, async move { + let rust_instances = this + .select_instances( + service_name, + Some(group), + clusters.unwrap_or_default(), + subscribe.unwrap_or(true), + healthy.unwrap_or(true), + ) + .await + .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?; + Ok(rust_instances + .iter() + .map(transfer_rust_instance_to_ffi) + .collect::>()) + }) + } + + /// Select one healthy instance. default cluster=[], subscribe=true. + /// If it fails, pay attention to err + pub fn select_one_healthy_instance<'p>( + &self, + py: Python<'p>, + service_name: String, + group: String, + clusters: Option>, + subscribe: Option, + ) -> PyResult<&'p PyAny> { + let this = self.inner.clone(); + future_into_py(py, async move { + let rust_instance = this + .select_one_healthy_instance( + service_name, + Some(group), + clusters.unwrap_or_default(), + subscribe.unwrap_or(true), + ) + .await + .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?; + Ok(transfer_rust_instance_to_ffi(&rust_instance)) + }) + } + + /// Add NacosNamingEventListener callback func, which listen the instance change. + /// If it fails, pay attention to err + #[pyo3(signature = (service_name, group, clusters, listener))] + pub fn subscribe<'p>( + &self, + py: Python<'p>, + service_name: String, + group: String, + clusters: Option>, + listener: &PyAny, // PyFunction arg: Vec + ) -> PyResult<&'p PyAny> { + if !listener.is_callable() { + return Err(PyErr::new::( + "Arg `listener` must be a callable", + )); + } + let listen_wrap = Arc::new(NacosNamingEventListener { + func: Arc::new(listener.to_object(py)), + }); + let this = self.inner.clone(); + + future_into_py(py, async move { + this.subscribe( + service_name, + Some(group), + clusters.unwrap_or_default(), + listen_wrap, + ) + .await + .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?; + Ok(()) + }) + } + + /// Remove NacosNamingEventListener callback func, but noop.... + /// The logic is not implemented internally, and only APIs are provided as compatibility. + /// Users maybe do not need it? Not removing the subscription is not a big problem, Sorry! + #[pyo3(signature = (service_name, group, clusters, listener))] + #[allow(unused_variables)] + pub fn un_subscribe<'p>( + &self, + py: Python<'p>, + service_name: String, + group: String, + clusters: Option>, + listener: &PyAny, // PyFunction arg: Vec + ) -> PyResult<&'p PyAny> { + future_into_py(py, async move { Ok(()) }) + } +} diff --git a/src/config.rs b/src/config.rs index adc0a9e..1a478fc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -6,16 +6,16 @@ use pyo3::{pyclass, pymethods, PyAny, PyErr, PyObject, PyResult, Python, ToPyObj use std::sync::Arc; /// Client api of Nacos Config. -#[pyclass] +#[pyclass(module = "nacos_sdk_rust_binding_py")] pub struct NacosConfigClient { - inner: Arc, + pub(crate) inner: Arc, } #[pymethods] impl NacosConfigClient { /// Build a Config Client. #[new] - pub fn new(client_options: crate::ClientOptions) -> PyResult { + pub fn new(client_options: crate::ClientOptions) -> PyResult { // print to console or file let _ = crate::init_logger(); @@ -64,9 +64,8 @@ impl NacosConfigClient { /// Get NacosConfigResponse. /// If it fails, pay attention to err pub fn get_config_resp(&self, data_id: String, group: String) -> PyResult { - let config_resp = self - .inner - .get_config(data_id, group) + let future = self.inner.get_config(data_id, group); + let config_resp = futures::executor::block_on(future) .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?; Ok(transfer_conf_resp(config_resp)) } @@ -79,16 +78,16 @@ impl NacosConfigClient { group: String, content: String, ) -> PyResult { - self.inner - .publish_config(data_id, group, content, None) + let future = self.inner.publish_config(data_id, group, content, None); + futures::executor::block_on(future) .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err))) } /// Remove config. /// If it fails, pay attention to err pub fn remove_config(&self, data_id: String, group: String) -> PyResult { - self.inner - .remove_config(data_id, group) + let future = self.inner.remove_config(data_id, group); + futures::executor::block_on(future) .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err))) } @@ -107,14 +106,14 @@ impl NacosConfigClient { "Arg `listener` must be a callable", )); } - self.inner - .add_listener( - data_id, - group, - Arc::new(NacosConfigChangeListener { - func: Arc::new(listener.to_object(py)), - }), - ) + let future = self.inner.add_listener( + data_id, + group, + Arc::new(NacosConfigChangeListener { + func: Arc::new(listener.to_object(py)), + }), + ); + futures::executor::block_on(future) .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?; Ok(()) } @@ -135,7 +134,7 @@ impl NacosConfigClient { } } -#[pyclass] +#[pyclass(module = "nacos_sdk_rust_binding_py")] pub struct NacosConfigResponse { /// Namespace/Tenant #[pyo3(get)] @@ -157,8 +156,8 @@ pub struct NacosConfigResponse { pub md5: String, } -pub struct NacosConfigChangeListener { - func: Arc, +pub(crate) struct NacosConfigChangeListener { + pub(crate) func: Arc, } impl nacos_sdk::api::config::ConfigChangeListener for NacosConfigChangeListener { @@ -173,7 +172,9 @@ impl nacos_sdk::api::config::ConfigChangeListener for NacosConfigChangeListener } } -fn transfer_conf_resp(config_resp: nacos_sdk::api::config::ConfigResponse) -> NacosConfigResponse { +pub(crate) fn transfer_conf_resp( + config_resp: nacos_sdk::api::config::ConfigResponse, +) -> NacosConfigResponse { NacosConfigResponse { namespace: config_resp.namespace().to_string(), data_id: config_resp.data_id().to_string(), diff --git a/src/lib.rs b/src/lib.rs index 7a32ee9..bfb7a3f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,9 @@ fn nacos_sdk_rust_binding_py(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + // Async Client api + m.add_class::()?; + m.add_class::()?; Ok(()) } @@ -52,7 +55,7 @@ fn init_logger() -> &'static tracing_appender::non_blocking::WorkerGuard { &LOG_GUARD } -#[pyclass] +#[pyclass(module = "nacos_sdk_rust_binding_py")] #[derive(Clone)] pub struct ClientOptions { /// Server Addr, e.g. address:port[,address:port],...] @@ -107,3 +110,9 @@ pub use config::*; mod naming; pub use naming::*; + +mod async_config; +pub use async_config::*; + +mod async_naming; +pub use async_naming::*; diff --git a/src/naming.rs b/src/naming.rs index 7d818f3..755c7ac 100644 --- a/src/naming.rs +++ b/src/naming.rs @@ -6,7 +6,7 @@ use pyo3::{pyclass, pymethods, PyAny, PyErr, PyObject, PyResult, Python, ToPyObj use std::sync::Arc; /// Client api of Nacos Naming. -#[pyclass] +#[pyclass(module = "nacos_sdk_rust_binding_py")] pub struct NacosNamingClient { inner: Arc, } @@ -15,7 +15,7 @@ pub struct NacosNamingClient { impl NacosNamingClient { /// Build a Naming Client. #[new] - pub fn new(client_options: crate::ClientOptions) -> PyResult { + pub fn new(client_options: crate::ClientOptions) -> PyResult { // print to console or file let _ = crate::init_logger(); @@ -66,12 +66,12 @@ impl NacosNamingClient { group: String, service_instance: NacosServiceInstance, ) -> PyResult<()> { - self.inner - .register_instance( - service_name, - Some(group), - transfer_ffi_instance_to_rust(&service_instance), - ) + let future = self.inner.register_instance( + service_name, + Some(group), + transfer_ffi_instance_to_rust(&service_instance), + ); + futures::executor::block_on(future) .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err))) } @@ -83,12 +83,12 @@ impl NacosNamingClient { group: String, service_instance: NacosServiceInstance, ) -> PyResult<()> { - self.inner - .deregister_instance( - service_name, - Some(group), - transfer_ffi_instance_to_rust(&service_instance), - ) + let future = self.inner.deregister_instance( + service_name, + Some(group), + transfer_ffi_instance_to_rust(&service_instance), + ); + futures::executor::block_on(future) .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err))) } @@ -105,8 +105,10 @@ impl NacosNamingClient { .map(transfer_ffi_instance_to_rust) .collect(); - self.inner - .batch_register_instance(service_name, Some(group), rust_instances) + let future = self + .inner + .batch_register_instance(service_name, Some(group), rust_instances); + futures::executor::block_on(future) .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err))) } @@ -119,14 +121,13 @@ impl NacosNamingClient { clusters: Option>, subscribe: Option, ) -> PyResult> { - let rust_instances = self - .inner - .get_all_instances( - service_name, - Some(group), - clusters.unwrap_or_default(), - subscribe.unwrap_or(true), - ) + let future = self.inner.get_all_instances( + service_name, + Some(group), + clusters.unwrap_or_default(), + subscribe.unwrap_or(true), + ); + let rust_instances = futures::executor::block_on(future) .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?; Ok(rust_instances @@ -145,15 +146,14 @@ impl NacosNamingClient { subscribe: Option, healthy: Option, ) -> PyResult> { - let rust_instances = self - .inner - .select_instances( - service_name, - Some(group), - clusters.unwrap_or_default(), - subscribe.unwrap_or(true), - healthy.unwrap_or(true), - ) + let future = self.inner.select_instances( + service_name, + Some(group), + clusters.unwrap_or_default(), + subscribe.unwrap_or(true), + healthy.unwrap_or(true), + ); + let rust_instances = futures::executor::block_on(future) .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?; Ok(rust_instances @@ -171,14 +171,13 @@ impl NacosNamingClient { clusters: Option>, subscribe: Option, ) -> PyResult { - let rust_instance = self - .inner - .select_one_healthy_instance( - service_name, - Some(group), - clusters.unwrap_or_default(), - subscribe.unwrap_or(true), - ) + let future = self.inner.select_one_healthy_instance( + service_name, + Some(group), + clusters.unwrap_or_default(), + subscribe.unwrap_or(true), + ); + let rust_instance = futures::executor::block_on(future) .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?; Ok(transfer_rust_instance_to_ffi(&rust_instance)) @@ -200,15 +199,15 @@ impl NacosNamingClient { "Arg `listener` must be a callable", )); } - self.inner - .subscribe( - service_name, - Some(group), - clusters.unwrap_or_default(), - Arc::new(NacosNamingEventListener { - func: Arc::new(listener.to_object(py)), - }), - ) + let future = self.inner.subscribe( + service_name, + Some(group), + clusters.unwrap_or_default(), + Arc::new(NacosNamingEventListener { + func: Arc::new(listener.to_object(py)), + }), + ); + futures::executor::block_on(future) .map_err(|nacos_err| PyRuntimeError::new_err(format!("{:?}", &nacos_err)))?; Ok(()) } @@ -230,8 +229,8 @@ impl NacosNamingClient { } } -pub struct NacosNamingEventListener { - func: Arc, +pub(crate) struct NacosNamingEventListener { + pub(crate) func: Arc, } impl nacos_sdk::api::naming::NamingEventListener for NacosNamingEventListener { @@ -255,7 +254,7 @@ impl nacos_sdk::api::naming::NamingEventListener for NacosNamingEventListener { } } -#[pyclass] +#[pyclass(module = "nacos_sdk_rust_binding_py")] #[derive(Clone)] pub struct NacosServiceInstance { /// Instance Id @@ -320,7 +319,7 @@ impl NacosServiceInstance { } } -fn transfer_ffi_instance_to_rust( +pub(crate) fn transfer_ffi_instance_to_rust( ffi_instance: &NacosServiceInstance, ) -> nacos_sdk::api::naming::ServiceInstance { nacos_sdk::api::naming::ServiceInstance { @@ -337,7 +336,7 @@ fn transfer_ffi_instance_to_rust( } } -fn transfer_rust_instance_to_ffi( +pub(crate) fn transfer_rust_instance_to_ffi( rust_instance: &nacos_sdk::api::naming::ServiceInstance, ) -> NacosServiceInstance { NacosServiceInstance {