diff --git a/.github/workflows/check-new-ver.yml b/.github/workflows/check-new-ver.yml index da7e7e0..e35d913 100644 --- a/.github/workflows/check-new-ver.yml +++ b/.github/workflows/check-new-ver.yml @@ -6,7 +6,6 @@ on: jobs: check: - if: ${{ false }} # TODO: Re-enable once #38 is merged. runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 diff --git a/CHANGELOG.md b/CHANGELOG.md index 55c3b46..7492834 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,16 +6,28 @@ All user visible changes to `tigerbeetle-unofficial`, `tigerbeetle-unofficial-co -## [master] · unreleased -[master]: /../../tree/v0.5.0%2B0.16.11 +## [0.6.0+0.16.17] · unreleased +[0.6.0+0.16.17]: /../../tree/v0.6.0%2B0.16.17 -[Diff](/../../compare/v0.5.0%2B0.16.11...master) | [Milestone](/../../milestone/2) +[Diff](/../../compare/v0.5.0%2B0.16.11...v0.6.0%2B0.16.17) | [Milestone](/../../milestone/2) + +### BC Breaks + +- Upgraded [`tb_client` C library] to [0.16.17 version][tb-0.16.17]: ([#38]) + - Replaced `payload` argument with `reply` in `core::Callbacks::on_competion()` to provide cluster `timestamp` of `Reply` generation. ([tigerbeetle/tigerbeetle#2481]) + - Replaced `TIGERBEETLE_LOG_LEVEL` build time env var with `TB_CLIENT_DEBUG` one, since `config-log-level` build option was removed, but no FFI yet added for configuring runtime log filtering. ([tigerbeetle/tigerbeetle#2539]) ### Added +- `SendErrorKind::ClientEvicted` variant. ([#38], [tigerbeetle/tigerbeetle#2484]) - `id()` function generating [TigerBeetle Time-Based Identifiers](https://docs.tigerbeetle.com/coding/data-modeling#tigerbeetle-time-based-identifiers-recommended). ([#39]) +[#38]: /../../pull/38 [#39]: /../../pull/39 +[tb-0.16.17]: https://github.com/tigerbeetle/tigerbeetle/blob/0.16.17/CHANGELOG.md#tigerbeetle-01617 +[tigerbeetle/tigerbeetle#2539]: https://github.com/tigerbeetle/tigerbeetle/pull/2539 +[tigerbeetle/tigerbeetle#2481]: https://github.com/tigerbeetle/tigerbeetle/pull/2481 +[tigerbeetle/tigerbeetle#2484]: https://github.com/tigerbeetle/tigerbeetle/pull/2484 @@ -27,7 +39,7 @@ All user visible changes to `tigerbeetle-unofficial`, `tigerbeetle-unofficial-co ### BC Breaks -- Upgraded [`tb_client` C library] to [0.16.1 version][tb-0.16.1]. ([#24], [#19], [#18]) +- Upgraded [`tb_client` C library] to [0.16.11 version][tb-0.16.11]. ([#24], [#19], [#18]) - Removed `concurrency_max` argument from `Client::new()`, `Client::with_callback()` and `Client::with_callback_unchecked()` methods. ([#24], [#19]) - Replaced `Client::acquire()` and `ClientHandle::acquire()` methods with `Client::packet()` and `Packet::new()`. ([#24], [#19], [#34]) - Removed `error::AcquirePacketError` type. ([#24], [#19]) @@ -49,7 +61,7 @@ All user visible changes to `tigerbeetle-unofficial`, `tigerbeetle-unofficial-co [#24]: /../../pull/24 [#26]: /../../pull/26 [#34]: /../../pull/34 -[tb-0.16.1]: https://github.com/tigerbeetle/tigerbeetle/blob/0.16.11/CHANGELOG.md#tigerbeetle-01611 +[tb-0.16.11]: https://github.com/tigerbeetle/tigerbeetle/blob/0.16.11/CHANGELOG.md#tigerbeetle-01611 diff --git a/Cargo.toml b/Cargo.toml index a95f4d6..bea17bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ tokio-rt-multi-thread = ["core/tokio-rt-multi-thread"] [dependencies] bytemuck = { version = "1.16", features = ["extern_crate_alloc"] } -core = { version = "=0.5.0+0.16.11", package = "tigerbeetle-unofficial-core", path = "core" } +core = { version = "=0.6.0+0.16.17", package = "tigerbeetle-unofficial-core", path = "core" } fastrand = "2.3" tokio = { version = "1.28.1", features = ["sync"] } @@ -26,7 +26,7 @@ pollster = { version = "0.4", features = ["macro"] } members = ["sys", "core"] [workspace.package] -version = "0.5.0+0.16.11" +version = "0.6.0+0.16.17" authors = ["Daria Sukhonina "] rust-version = "1.78" repository = "https://github.com/tigerbeetle-rust/tigerbeetle-unofficial" diff --git a/core/Cargo.toml b/core/Cargo.toml index 91da848..37706b0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -17,5 +17,5 @@ tokio-rt-multi-thread = ["dep:tokio", "tokio/rt-multi-thread"] [dependencies] bytemuck = "1.19" sptr = "0.3.2" -sys = { version = "=0.5.0+0.16.11", package = "tigerbeetle-unofficial-sys", path = "../sys", features = ["generated-safe"] } +sys = { version = "=0.6.0+0.16.17", package = "tigerbeetle-unofficial-sys", path = "../sys", features = ["generated-safe"] } tokio = { version = "1.28.1", optional = true } diff --git a/core/examples/c_port_low_level.rs b/core/examples/c_port_low_level.rs index 48b7bce..63a6f9c 100644 --- a/core/examples/c_port_low_level.rs +++ b/core/examples/c_port_low_level.rs @@ -260,16 +260,20 @@ impl UserData { impl tb::Callbacks for Callbacks { type UserDataPtr = Box; - fn on_completion(&self, packet: tb::Packet<'_, Self::UserDataPtr>, payload: &[u8]) { + fn on_completion( + &self, + packet: tb::Packet<'_, Self::UserDataPtr>, + reply: Option>, + ) { let status = packet.status(); let user_data = packet.into_user_data(); let ctx = user_data.ctx; - { + if let Some(reply) = reply { let mut state = ctx.state.lock().unwrap(); - state.reply[..payload.len()].copy_from_slice(payload); - state.size = payload.len(); - ctx.cv.notify_one(); + state.reply[..reply.payload.len()].copy_from_slice(reply.payload); + state.size = reply.payload.len(); } + ctx.cv.notify_one(); user_data.free(status); } } diff --git a/core/src/callback.rs b/core/src/callback.rs index 029cc5a..cc599cd 100644 --- a/core/src/callback.rs +++ b/core/src/callback.rs @@ -1,4 +1,9 @@ -use std::{marker::PhantomData, panic::catch_unwind, slice}; +use std::{ + marker::PhantomData, + panic::catch_unwind, + slice, + time::{Duration, SystemTime}, +}; use crate::util::RawConstPtr; @@ -33,16 +38,30 @@ mod callbacks_ptr { } } -// `Self: Sync` because `F` is called from some zig thread. +/// Reply returned by [`Callbacks`]. +#[non_exhaustive] +pub struct Reply<'a> { + /// Returned raw payload of this [`Reply`] as bytes. + pub payload: &'a [u8], + + /// Cluster timestamp when the reply was generated. + pub timestamp: SystemTime, +} + +// `Self: Sync` because `F` is called from some Zig thread. pub trait Callbacks: Sync { type UserDataPtr: UserDataPtr; - fn on_completion(&self, packet: Packet<'_, Self::UserDataPtr>, payload: &[u8]); + /// Calls back once a [`Packet`] is submitted and processed. + /// + /// [`None`] `reply` means that submitting the [`Packet`] failed (check the [`Packet::status`] + /// for the reason). + fn on_completion(&self, packet: Packet<'_, Self::UserDataPtr>, reply: Option>); } pub struct CallbacksFn where - F: Fn(Packet<'_, U>, &[u8]) + Sync, + F: Fn(Packet<'_, U>, Option>) + Sync, U: UserDataPtr, { inner: F, @@ -51,7 +70,7 @@ where impl CallbacksFn where - F: Fn(Packet<'_, U>, &[u8]) + Sync, + F: Fn(Packet<'_, U>, Option>) + Sync, U: UserDataPtr, { pub const fn new(inner: F) -> Self @@ -68,19 +87,19 @@ where impl Callbacks for CallbacksFn where - F: Fn(Packet<'_, U>, &[u8]) + Sync, + F: Fn(Packet<'_, U>, Option>) + Sync, U: UserDataPtr, { type UserDataPtr = U; - fn on_completion(&self, packet: Packet<'_, Self::UserDataPtr>, payload: &[u8]) { - (self.inner)(packet, payload) + fn on_completion(&self, packet: Packet<'_, Self::UserDataPtr>, reply: Option>) { + (self.inner)(packet, reply) } } pub const fn on_completion_fn(f: F) -> CallbacksFn where - F: Fn(Packet<'_, U>, &[u8]) + Sync, + F: Fn(Packet<'_, U>, Option>) + Sync, U: UserDataPtr, { CallbacksFn::new(f) @@ -90,6 +109,7 @@ pub(crate) unsafe extern "C" fn on_completion_raw_fn( ctx: usize, raw_client: sys::tb_client_t, packet: *mut sys::tb_packet_t, + timestamp: u64, payload: *const u8, payload_size: u32, ) where @@ -97,9 +117,10 @@ pub(crate) unsafe extern "C" fn on_completion_raw_fn( { let _ = catch_unwind(|| { let cb = &*sptr::from_exposed_addr::(ctx); - let payload_size = payload_size - .try_into() - .expect("At the start of calling on_completion callback: unable to convert payload_size from u32 into usize"); + let payload_size = payload_size.try_into().expect( + "at the start of calling `on_completion` callback: \ + unable to convert `payload_size` from `u32` into `usize`", + ); let payload = if payload_size != 0 { slice::from_raw_parts(payload, payload_size) } else { @@ -112,7 +133,8 @@ pub(crate) unsafe extern "C" fn on_completion_raw_fn( on_completion: cb, }, }; - cb.on_completion(packet, payload) + let timestamp = SystemTime::UNIX_EPOCH + Duration::from_nanos(timestamp); + cb.on_completion(packet, Some(Reply { payload, timestamp })) }); } diff --git a/core/src/lib.rs b/core/src/lib.rs index a6f07d1..ae655c5 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -19,7 +19,7 @@ pub use query_filter::QueryFilter; pub use transfer::Transfer; type OnCompletionRawFn = - unsafe extern "C" fn(usize, sys::tb_client_t, *mut sys::tb_packet_t, *const u8, u32); + unsafe extern "C" fn(usize, sys::tb_client_t, *mut sys::tb_packet_t, u64, *const u8, u32); pub struct Client where @@ -84,7 +84,7 @@ where let mut raw = mem::zeroed(); let status = sys::tb_client_init( &mut raw, - cluster_id, + cluster_id.to_ne_bytes().as_ptr(), address.as_ptr().cast(), address .len() diff --git a/core/src/packet.rs b/core/src/packet.rs index 44090b7..660d93e 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -57,7 +57,7 @@ where let data = self.user_data().data(); let Ok(data_size) = data.len().try_into() else { self.set_status(Err(SendErrorKind::TooMuchData.into())); - self.handle.on_completion.on_completion(self, &[]); + self.handle.on_completion.on_completion(self, None); return; }; let data = data.as_ptr(); diff --git a/src/id.rs b/src/id.rs index b434806..c733055 100644 --- a/src/id.rs +++ b/src/id.rs @@ -114,7 +114,7 @@ mod id_spec { } // Port of upstream test: - // https://github.com/tigerbeetle/tigerbeetle/blob/0.16.11/src/clients/go/pkg/types/main_test.go#L75-L115 + // https://github.com/tigerbeetle/tigerbeetle/blob/0.16.17/src/clients/go/pkg/types/main_test.go#L75-L115 #[test] fn monotonic_fuzz() { fn verifier() { diff --git a/src/lib.rs b/src/lib.rs index c2a3704..3bdc4a6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -161,13 +161,21 @@ impl Client { impl core::Callbacks for Callbacks { type UserDataPtr = Box; - fn on_completion(&self, packet: core::Packet<'_, Self::UserDataPtr>, payload: &[u8]) { + fn on_completion( + &self, + packet: core::Packet<'_, Self::UserDataPtr>, + reply: Option>, + ) { let status = packet.status(); let operation = packet.operation(); let user_data = packet.into_user_data(); user_data .reply_sender - .send(status.map(|()| Reply::copy_from_reply(operation.kind(), payload))) + .send(status.map(|()| { + // PANIC: Unwrapping is OK here, because the `reply` can only be `None` when the + // `status` is `Err`. + Reply::copy_from_reply(operation.kind(), reply.unwrap().payload) + })) .unwrap_or_else(|_| panic!("Unexpected: reply receiver is already dropped")); } } diff --git a/sys/build.rs b/sys/build.rs index 277609f..4424c89 100644 --- a/sys/build.rs +++ b/sys/build.rs @@ -15,10 +15,10 @@ use syn::visit::Visit; /// Version of the used [TigerBeetle] release. /// /// [TigerBeetle]: https://github.com/tigerbeetle/tigerbeetle -const TIGERBEETLE_RELEASE: &str = "0.16.11"; +const TIGERBEETLE_RELEASE: &str = "0.16.17"; /// Commit hash of the [`TIGERBEETLE_RELEASE`]. -const TIGERBEETLE_COMMIT: &str = "ea8a3e445fd1801d8f5ad1dbd6a9320861053912"; +const TIGERBEETLE_COMMIT: &str = "19a37355a64d09b0e35b14e5d1699e098bffdab8"; fn target_to_lib_dir(target: &str) -> Option<&'static str> { match target { @@ -54,11 +54,14 @@ const SCRIPT_EXTENSION: &str = "bat"; fn main() { assert!(env!("CARGO_PKG_VERSION").ends_with(TIGERBEETLE_RELEASE)); let out_dir: PathBuf = env::var("OUT_DIR").unwrap().into(); - let debug: bool = env::var("DEBUG").unwrap().parse().unwrap(); + let debug: bool = env::var("TB_CLIENT_DEBUG").map_or_else( + |_| env::var("DEBUG").unwrap().parse().unwrap(), + |s| s.parse().unwrap(), + ); let target = env::var("TARGET").unwrap(); - let log_level = env::var("TIGERBEETLE_LOG_LEVEL").unwrap_or_else(|_| "info".to_owned()); println!("cargo:rerun-if-env-changed=DOCS_RS"); + println!("cargo:rerun-if-env-changed=TB_CLIENT_DEBUG"); println!("cargo:rerun-if-changed=src/wrapper.h"); let wrapper; @@ -110,7 +113,6 @@ fn main() { .arg("clients:c") .args((!debug).then_some("-Drelease")) .arg(format!("-Dtarget={tigerbeetle_target}")) - .arg(format!("-Dconfig-log-level={log_level}")) .arg(format!("-Dconfig-release={TIGERBEETLE_RELEASE}")) .arg(format!("-Dconfig-release-client-min={TIGERBEETLE_RELEASE}")) .arg(format!("-Dgit-commit={TIGERBEETLE_COMMIT}")) diff --git a/sys/src/lib.rs b/sys/src/lib.rs index 0032a9f..9d697cc 100644 --- a/sys/src/lib.rs +++ b/sys/src/lib.rs @@ -30,7 +30,7 @@ mod linked { let address = "3000".as_bytes(); _ = crate::tb_client_init( &mut raw, - 1, + 1_u128.to_ne_bytes().as_ptr(), address.as_ptr().cast(), address.len().try_into().unwrap(), ptr::null::<()>() as usize, @@ -47,7 +47,7 @@ mod linked { let address = "3000".as_bytes(); _ = crate::tb_client_init_echo( &mut raw, - 1, + 1_u128.to_ne_bytes().as_ptr(), address.as_ptr().cast(), address.len().try_into().unwrap(), ptr::null::<()>() as usize, diff --git a/sys/tigerbeetle b/sys/tigerbeetle index ea8a3e4..19a3735 160000 --- a/sys/tigerbeetle +++ b/sys/tigerbeetle @@ -1 +1 @@ -Subproject commit ea8a3e445fd1801d8f5ad1dbd6a9320861053912 +Subproject commit 19a37355a64d09b0e35b14e5d1699e098bffdab8