Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to 0.16.17 TigerBeetle #38

Merged
merged 13 commits into from
Dec 18, 2024
1 change: 0 additions & 1 deletion .github/workflows/check-new-ver.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ on:

jobs:
check:
if: ${{ false }} # TODO: Re-enable once #38 is merged.
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
22 changes: 17 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,28 @@ All user visible changes to `tigerbeetle-unofficial`, `tigerbeetle-unofficial-co



## [master] · unreleased
[master]: /../../tree/v0.5.0%2B0.16.11
## [0.6.0+0.16.17] · unreleased
[0.6.0+0.16.17]: /../../tree/v0.6.0%2B0.16.17

[Diff](/../../compare/v0.5.0%2B0.16.11...master) | [Milestone](/../../milestone/2)
[Diff](/../../compare/v0.5.0%2B0.16.11...v0.6.0%2B0.16.17) | [Milestone](/../../milestone/2)

### BC Breaks

- Upgraded [`tb_client` C library] to [0.16.17 version][tb-0.16.17]: ([#38])
- Replaced `payload` argument with `reply` in `core::Callbacks::on_competion()` to provide cluster `timestamp` of `Reply` generation. ([tigerbeetle/tigerbeetle#2481])
- Replaced `TIGERBEETLE_LOG_LEVEL` build time env var with `TB_CLIENT_DEBUG` one, since `config-log-level` build option was removed, but no FFI yet added for configuring runtime log filtering. ([tigerbeetle/tigerbeetle#2539])

### Added

- `SendErrorKind::ClientEvicted` variant. ([#38], [tigerbeetle/tigerbeetle#2484])
- `id()` function generating [TigerBeetle Time-Based Identifiers](https://docs.tigerbeetle.com/coding/data-modeling#tigerbeetle-time-based-identifiers-recommended). ([#39])

[#38]: /../../pull/38
[#39]: /../../pull/39
[tb-0.16.17]: https://github.com/tigerbeetle/tigerbeetle/blob/0.16.17/CHANGELOG.md#tigerbeetle-01617
[tigerbeetle/tigerbeetle#2539]: https://github.com/tigerbeetle/tigerbeetle/pull/2539
[tigerbeetle/tigerbeetle#2481]: https://github.com/tigerbeetle/tigerbeetle/pull/2481
[tigerbeetle/tigerbeetle#2484]: https://github.com/tigerbeetle/tigerbeetle/pull/2484



Expand All @@ -27,7 +39,7 @@ All user visible changes to `tigerbeetle-unofficial`, `tigerbeetle-unofficial-co

### BC Breaks

- Upgraded [`tb_client` C library] to [0.16.1 version][tb-0.16.1]. ([#24], [#19], [#18])
- Upgraded [`tb_client` C library] to [0.16.11 version][tb-0.16.11]. ([#24], [#19], [#18])
- Removed `concurrency_max` argument from `Client::new()`, `Client::with_callback()` and `Client::with_callback_unchecked()` methods. ([#24], [#19])
- Replaced `Client::acquire()` and `ClientHandle::acquire()` methods with `Client::packet()` and `Packet::new()`. ([#24], [#19], [#34])
- Removed `error::AcquirePacketError` type. ([#24], [#19])
Expand All @@ -49,7 +61,7 @@ All user visible changes to `tigerbeetle-unofficial`, `tigerbeetle-unofficial-co
[#24]: /../../pull/24
[#26]: /../../pull/26
[#34]: /../../pull/34
[tb-0.16.1]: https://github.com/tigerbeetle/tigerbeetle/blob/0.16.11/CHANGELOG.md#tigerbeetle-01611
[tb-0.16.11]: https://github.com/tigerbeetle/tigerbeetle/blob/0.16.11/CHANGELOG.md#tigerbeetle-01611



Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ tokio-rt-multi-thread = ["core/tokio-rt-multi-thread"]

[dependencies]
bytemuck = { version = "1.16", features = ["extern_crate_alloc"] }
core = { version = "=0.5.0+0.16.11", package = "tigerbeetle-unofficial-core", path = "core" }
core = { version = "=0.6.0+0.16.17", package = "tigerbeetle-unofficial-core", path = "core" }
fastrand = "2.3"
tokio = { version = "1.28.1", features = ["sync"] }

Expand All @@ -26,7 +26,7 @@ pollster = { version = "0.4", features = ["macro"] }
members = ["sys", "core"]

[workspace.package]
version = "0.5.0+0.16.11"
version = "0.6.0+0.16.17"
authors = ["Daria Sukhonina <[email protected]>"]
rust-version = "1.78"
repository = "https://github.com/tigerbeetle-rust/tigerbeetle-unofficial"
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ tokio-rt-multi-thread = ["dep:tokio", "tokio/rt-multi-thread"]
[dependencies]
bytemuck = "1.19"
sptr = "0.3.2"
sys = { version = "=0.5.0+0.16.11", package = "tigerbeetle-unofficial-sys", path = "../sys", features = ["generated-safe"] }
sys = { version = "=0.6.0+0.16.17", package = "tigerbeetle-unofficial-sys", path = "../sys", features = ["generated-safe"] }
tokio = { version = "1.28.1", optional = true }
14 changes: 9 additions & 5 deletions core/examples/c_port_low_level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,16 +260,20 @@ impl UserData {
impl tb::Callbacks for Callbacks {
type UserDataPtr = Box<UserData>;

fn on_completion(&self, packet: tb::Packet<'_, Self::UserDataPtr>, payload: &[u8]) {
fn on_completion(
&self,
packet: tb::Packet<'_, Self::UserDataPtr>,
reply: Option<tb::Reply<'_>>,
) {
let status = packet.status();
let user_data = packet.into_user_data();
let ctx = user_data.ctx;
{
if let Some(reply) = reply {
let mut state = ctx.state.lock().unwrap();
state.reply[..payload.len()].copy_from_slice(payload);
state.size = payload.len();
ctx.cv.notify_one();
state.reply[..reply.payload.len()].copy_from_slice(reply.payload);
state.size = reply.payload.len();
}
ctx.cv.notify_one();
user_data.free(status);
}
}
48 changes: 35 additions & 13 deletions core/src/callback.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{marker::PhantomData, panic::catch_unwind, slice};
use std::{
marker::PhantomData,
panic::catch_unwind,
slice,
time::{Duration, SystemTime},
};

use crate::util::RawConstPtr;

Expand Down Expand Up @@ -33,16 +38,30 @@ mod callbacks_ptr {
}
}

// `Self: Sync` because `F` is called from some zig thread.
/// Reply returned by [`Callbacks`].
#[non_exhaustive]
pub struct Reply<'a> {
/// Returned raw payload of this [`Reply`] as bytes.
tyranron marked this conversation as resolved.
Show resolved Hide resolved
pub payload: &'a [u8],

/// Cluster timestamp when the reply was generated.
pub timestamp: SystemTime,
}

// `Self: Sync` because `F` is called from some Zig thread.
pub trait Callbacks: Sync {
type UserDataPtr: UserDataPtr;

fn on_completion(&self, packet: Packet<'_, Self::UserDataPtr>, payload: &[u8]);
/// Calls back once a [`Packet`] is submitted and processed.
///
/// [`None`] `reply` means that submitting the [`Packet`] failed (check the [`Packet::status`]
/// for the reason).
fn on_completion(&self, packet: Packet<'_, Self::UserDataPtr>, reply: Option<Reply<'_>>);
tyranron marked this conversation as resolved.
Show resolved Hide resolved
}

pub struct CallbacksFn<F, U>
where
F: Fn(Packet<'_, U>, &[u8]) + Sync,
F: Fn(Packet<'_, U>, Option<Reply<'_>>) + Sync,
U: UserDataPtr,
{
inner: F,
Expand All @@ -51,7 +70,7 @@ where

impl<F, U> CallbacksFn<F, U>
where
F: Fn(Packet<'_, U>, &[u8]) + Sync,
F: Fn(Packet<'_, U>, Option<Reply<'_>>) + Sync,
U: UserDataPtr,
{
pub const fn new(inner: F) -> Self
Expand All @@ -68,19 +87,19 @@ where

impl<F, U> Callbacks for CallbacksFn<F, U>
where
F: Fn(Packet<'_, U>, &[u8]) + Sync,
F: Fn(Packet<'_, U>, Option<Reply<'_>>) + Sync,
U: UserDataPtr,
{
type UserDataPtr = U;

fn on_completion(&self, packet: Packet<'_, Self::UserDataPtr>, payload: &[u8]) {
(self.inner)(packet, payload)
fn on_completion(&self, packet: Packet<'_, Self::UserDataPtr>, reply: Option<Reply<'_>>) {
(self.inner)(packet, reply)
}
}

pub const fn on_completion_fn<U, F>(f: F) -> CallbacksFn<F, U>
where
F: Fn(Packet<'_, U>, &[u8]) + Sync,
F: Fn(Packet<'_, U>, Option<Reply<'_>>) + Sync,
U: UserDataPtr,
{
CallbacksFn::new(f)
Expand All @@ -90,16 +109,18 @@ pub(crate) unsafe extern "C" fn on_completion_raw_fn<F>(
ctx: usize,
raw_client: sys::tb_client_t,
packet: *mut sys::tb_packet_t,
timestamp: u64,
payload: *const u8,
payload_size: u32,
) where
F: Callbacks,
{
let _ = catch_unwind(|| {
let cb = &*sptr::from_exposed_addr::<F>(ctx);
let payload_size = payload_size
.try_into()
.expect("At the start of calling on_completion callback: unable to convert payload_size from u32 into usize");
let payload_size = payload_size.try_into().expect(
"at the start of calling `on_completion` callback: \
unable to convert `payload_size` from `u32` into `usize`",
);
let payload = if payload_size != 0 {
slice::from_raw_parts(payload, payload_size)
} else {
Expand All @@ -112,7 +133,8 @@ pub(crate) unsafe extern "C" fn on_completion_raw_fn<F>(
on_completion: cb,
},
};
cb.on_completion(packet, payload)
let timestamp = SystemTime::UNIX_EPOCH + Duration::from_nanos(timestamp);
cb.on_completion(packet, Some(Reply { payload, timestamp }))
});
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub use query_filter::QueryFilter;
pub use transfer::Transfer;

type OnCompletionRawFn =
unsafe extern "C" fn(usize, sys::tb_client_t, *mut sys::tb_packet_t, *const u8, u32);
unsafe extern "C" fn(usize, sys::tb_client_t, *mut sys::tb_packet_t, u64, *const u8, u32);

pub struct Client<F>
where
Expand Down Expand Up @@ -84,7 +84,7 @@ where
let mut raw = mem::zeroed();
let status = sys::tb_client_init(
&mut raw,
cluster_id,
cluster_id.to_ne_bytes().as_ptr(),
address.as_ptr().cast(),
address
.len()
Expand Down
2 changes: 1 addition & 1 deletion core/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ where
let data = self.user_data().data();
let Ok(data_size) = data.len().try_into() else {
self.set_status(Err(SendErrorKind::TooMuchData.into()));
self.handle.on_completion.on_completion(self, &[]);
self.handle.on_completion.on_completion(self, None);
return;
};
let data = data.as_ptr();
Expand Down
2 changes: 1 addition & 1 deletion src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ mod id_spec {
}

// Port of upstream test:
// https://github.com/tigerbeetle/tigerbeetle/blob/0.16.11/src/clients/go/pkg/types/main_test.go#L75-L115
// https://github.com/tigerbeetle/tigerbeetle/blob/0.16.17/src/clients/go/pkg/types/main_test.go#L75-L115
#[test]
fn monotonic_fuzz() {
fn verifier() {
Expand Down
12 changes: 10 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,21 @@ impl Client {
impl core::Callbacks for Callbacks {
type UserDataPtr = Box<UserData>;

fn on_completion(&self, packet: core::Packet<'_, Self::UserDataPtr>, payload: &[u8]) {
fn on_completion(
&self,
packet: core::Packet<'_, Self::UserDataPtr>,
reply: Option<core::Reply<'_>>,
) {
let status = packet.status();
let operation = packet.operation();
let user_data = packet.into_user_data();
user_data
.reply_sender
.send(status.map(|()| Reply::copy_from_reply(operation.kind(), payload)))
.send(status.map(|()| {
// PANIC: Unwrapping is OK here, because the `reply` can only be `None` when the
// `status` is `Err`.
Reply::copy_from_reply(operation.kind(), reply.unwrap().payload)
}))
.unwrap_or_else(|_| panic!("Unexpected: reply receiver is already dropped"));
}
}
Expand Down
12 changes: 7 additions & 5 deletions sys/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use syn::visit::Visit;
/// Version of the used [TigerBeetle] release.
///
/// [TigerBeetle]: https://github.com/tigerbeetle/tigerbeetle
const TIGERBEETLE_RELEASE: &str = "0.16.11";
const TIGERBEETLE_RELEASE: &str = "0.16.17";

/// Commit hash of the [`TIGERBEETLE_RELEASE`].
const TIGERBEETLE_COMMIT: &str = "ea8a3e445fd1801d8f5ad1dbd6a9320861053912";
const TIGERBEETLE_COMMIT: &str = "19a37355a64d09b0e35b14e5d1699e098bffdab8";

fn target_to_lib_dir(target: &str) -> Option<&'static str> {
match target {
Expand Down Expand Up @@ -54,11 +54,14 @@ const SCRIPT_EXTENSION: &str = "bat";
fn main() {
assert!(env!("CARGO_PKG_VERSION").ends_with(TIGERBEETLE_RELEASE));
let out_dir: PathBuf = env::var("OUT_DIR").unwrap().into();
let debug: bool = env::var("DEBUG").unwrap().parse().unwrap();
let debug: bool = env::var("TB_CLIENT_DEBUG").map_or_else(
|_| env::var("DEBUG").unwrap().parse().unwrap(),
|s| s.parse().unwrap(),
);
let target = env::var("TARGET").unwrap();
let log_level = env::var("TIGERBEETLE_LOG_LEVEL").unwrap_or_else(|_| "info".to_owned());

println!("cargo:rerun-if-env-changed=DOCS_RS");
println!("cargo:rerun-if-env-changed=TB_CLIENT_DEBUG");
println!("cargo:rerun-if-changed=src/wrapper.h");

let wrapper;
Expand Down Expand Up @@ -110,7 +113,6 @@ fn main() {
.arg("clients:c")
.args((!debug).then_some("-Drelease"))
.arg(format!("-Dtarget={tigerbeetle_target}"))
.arg(format!("-Dconfig-log-level={log_level}"))
.arg(format!("-Dconfig-release={TIGERBEETLE_RELEASE}"))
.arg(format!("-Dconfig-release-client-min={TIGERBEETLE_RELEASE}"))
.arg(format!("-Dgit-commit={TIGERBEETLE_COMMIT}"))
Expand Down
4 changes: 2 additions & 2 deletions sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mod linked {
let address = "3000".as_bytes();
_ = crate::tb_client_init(
&mut raw,
1,
1_u128.to_ne_bytes().as_ptr(),
address.as_ptr().cast(),
address.len().try_into().unwrap(),
ptr::null::<()>() as usize,
Expand All @@ -47,7 +47,7 @@ mod linked {
let address = "3000".as_bytes();
_ = crate::tb_client_init_echo(
&mut raw,
1,
1_u128.to_ne_bytes().as_ptr(),
address.as_ptr().cast(),
address.len().try_into().unwrap(),
ptr::null::<()>() as usize,
Expand Down
2 changes: 1 addition & 1 deletion sys/tigerbeetle
Loading