Skip to content

Commit

Permalink
Order asynchronous operations at method call but not future evaluation
Browse files Browse the repository at this point in the history
Resolves #6.
  • Loading branch information
kezhuw committed Jul 10, 2022
1 parent abfcde5 commit e4d68f8
Show file tree
Hide file tree
Showing 9 changed files with 548 additions and 310 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ pretty_assertions = "1.1.0"
test-case = "1.2.3"
testcontainers = { git = "https://github.com/kezhuw/testcontainers-rs.git", branch = "zookeeper-client" }
futures = "0.3.21"
speculoos = "0.9.0"
481 changes: 314 additions & 167 deletions src/client/mod.rs

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions src/proto/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@ use num_enum::{IntoPrimitive, TryFromPrimitive};
#[derive(Copy, Clone, Debug, PartialEq, Eq, IntoPrimitive)]
pub enum PredefinedXid {
Notification = -1,
/// ZooKeeper server [hard-code -2 as ping response xid][ping-xid], so we have to use this and make sure
/// at most one ping in wire.
///
/// ping-xid: https://github.com/apache/zookeeper/blob/de7c5869d372e46af43979134d0e30b49d2319b1/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java#L215
Ping = -2,

/// Fortunately, ZooKeeper server [use xid from header](auth-xid) to reply auth request, so we can have
/// multiple auth requets in network.
///
/// auth-xid: https://github.com/apache/zookeeper/blob/de7c5869d372e46af43979134d0e30b49d2319b1/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java#L1621
Auth = -4,
SetWatches = -8,
}
Expand Down
1 change: 0 additions & 1 deletion src/proto/request_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ impl RequestHeader {
pub fn with_code(code: OpCode) -> RequestHeader {
let xid = match code {
OpCode::Ping => PredefinedXid::Ping.into(),
OpCode::Auth => PredefinedXid::Auth.into(),
OpCode::SetWatches | OpCode::SetWatches2 => PredefinedXid::SetWatches.into(),
_ => 0,
};
Expand Down
107 changes: 52 additions & 55 deletions src/session/depot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,23 @@ use std::io::{self, IoSlice};
use hashbrown::HashMap;
use strum::IntoEnumIterator;
use tokio::net::TcpStream;
use tokio::sync::oneshot;

use super::request::{self, MarshalledRequest, Operation, SessionOperation, StateResponser};
use super::request::{MarshalledRequest, Operation, SessionOperation, StateResponser};
use super::types::WatchMode;
use super::xid::Xid;
use super::SessionId;
use crate::error::Error;
use crate::proto::{AuthPacket, OpCode, RemoveWatchesRequest};

pub type AuthResponser = oneshot::Sender<Result<(), Error>>;
use crate::proto::{OpCode, PredefinedXid, RemoveWatchesRequest};

#[derive(Default)]
pub struct Depot {
xid: Xid,

pending_authes: Vec<SessionOperation>,

writing_slices: Vec<IoSlice<'static>>,
writing_operations: VecDeque<Operation>,
written_operations: VecDeque<SessionOperation>,
pending_auth: Option<(AuthPacket, AuthResponser)>,
written_operations: HashMap<i32, SessionOperation>,

watching_paths: HashMap<(&'static str, WatchMode), usize>,
unwatching_paths: HashMap<(&'static str, WatchMode), SessionOperation>,
Expand All @@ -33,10 +31,10 @@ impl Depot {
let writing_capacity = 128usize;
Depot {
xid: Default::default(),
pending_authes: Vec::with_capacity(5),
writing_slices: Vec::with_capacity(writing_capacity),
writing_operations: VecDeque::with_capacity(writing_capacity),
written_operations: VecDeque::with_capacity(128),
pending_auth: None,
written_operations: HashMap::with_capacity(128),
watching_paths: HashMap::with_capacity(32),
unwatching_paths: HashMap::with_capacity(32),
}
Expand All @@ -45,28 +43,40 @@ impl Depot {
pub fn for_connecting() -> Depot {
Depot {
xid: Default::default(),
pending_authes: Default::default(),
writing_slices: Vec::with_capacity(10),
writing_operations: VecDeque::with_capacity(10),
written_operations: VecDeque::with_capacity(10),
pending_auth: None,
written_operations: HashMap::with_capacity(10),
watching_paths: HashMap::new(),
unwatching_paths: HashMap::new(),
}
}

/// Clear all buffered operations from previous run.
pub fn clear(&mut self) {
self.pending_authes.clear();
self.writing_slices.clear();
self.watching_paths.clear();
self.unwatching_paths.clear();
self.writing_operations.clear();
self.written_operations.clear();
}

pub fn error(&mut self, err: Error) {
self.written_operations.drain(..).for_each(|operation| {
/// Error out ongoing operations except authes.
pub fn error(&mut self, err: &Error) {
self.written_operations.drain().for_each(|(_, operation)| {
if operation.request.get_code() == OpCode::Auth {
self.pending_authes.push(operation);
return;
}
operation.responser.send(Err(err.clone()));
});
self.writing_operations.drain(..).for_each(|operation| {
if let Operation::Session(operation) = operation {
if operation.request.get_code() == OpCode::Auth {
self.pending_authes.push(operation);
return;
}
operation.responser.send(Err(err.clone()));
}
});
Expand All @@ -77,42 +87,28 @@ impl Depot {
self.watching_paths.clear();
}

pub fn is_empty(&self) -> bool {
self.writing_operations.is_empty() && self.written_operations.is_empty()
}

pub fn pop_pending_auth(&mut self) -> Option<(AuthPacket, AuthResponser)> {
self.pending_auth.take()
/// Terminate all ongoing operations including authes.
pub fn terminate(&mut self, err: Error) {
self.error(&err);
for SessionOperation { responser, .. } in self.pending_authes.drain(..) {
responser.send(Err(err.clone()));
}
}

pub fn has_pending_auth(&self) -> bool {
self.pending_auth.is_some()
/// Check whether there is any ongoing operations.
pub fn is_empty(&self) -> bool {
self.writing_operations.is_empty() && self.written_operations.is_empty()
}

pub fn pop_reqeust(&mut self, xid: i32) -> Result<SessionOperation, Error> {
match self.written_operations.pop_front() {
pub fn pop_request(&mut self, xid: i32) -> Result<SessionOperation, Error> {
match self.written_operations.remove(&xid) {
None => Err(Error::UnexpectedError(format!("recv response with xid {} but no pending request", xid))),
Some(operation) => {
let request_xid = operation.request.get_xid();
if xid == request_xid {
return Ok(operation);
}
self.written_operations.push_front(operation);
Err(Error::UnexpectedError(format!("expect response xid {} but got {}", xid, request_xid)))
},
Some(operation) => Ok(operation),
}
}

pub fn pop_ping(&mut self) -> Result<(), Error> {
if let Some(operation) = self.written_operations.pop_front() {
let op_code = operation.request.get_code();
if op_code != OpCode::Ping {
self.written_operations.push_front(operation);
return Err(Error::UnexpectedError(format!("expect pending ping request, got {}", op_code)));
}
return Ok(());
}
Err(Error::UnexpectedError("expect pending ping request, got none".to_string()))
self.pop_request(PredefinedXid::Ping.into()).map(|_| ())
}

pub fn push_operation(&mut self, operation: Operation) {
Expand All @@ -126,9 +122,11 @@ impl Depot {
}

pub fn start(&mut self) {
if let Some((auth, responser)) = self.pending_auth.take() {
self.push_auth(auth, responser);
let mut pending_authes = std::mem::take(&mut self.pending_authes);
for operation in pending_authes.drain(..) {
self.push_session(operation);
}
self.pending_authes = pending_authes;
}

fn cancel_unwatch(&mut self, path: &'static str, mode: WatchMode) {
Expand Down Expand Up @@ -195,12 +193,6 @@ impl Depot {
.any(|mode| self.watching_paths.contains_key(&(path, mode)))
}

pub fn push_auth(&mut self, auth: AuthPacket, responser: AuthResponser) {
let operation = request::build_auth_operation(OpCode::Auth, &auth);
self.pending_auth = Some((auth, responser));
self.push_operation(Operation::Auth(operation));
}

pub fn write_operations(&mut self, sock: &TcpStream, session_id: SessionId) -> Result<(), Error> {
let result = sock.try_write_vectored(self.writing_slices.as_slice());
let mut written_bytes = match result {
Expand All @@ -226,13 +218,18 @@ impl Depot {
.unwrap_or(self.writing_slices.len());
if written_slices != 0 {
self.writing_slices.drain(..written_slices);
let written = self.writing_operations.drain(..written_slices).filter_map(|operation| {
if let Operation::Session(operation) = operation {
return Some(operation);
}
None
});
self.written_operations.extend(written);
self.writing_operations
.drain(..written_slices)
.filter_map(|operation| {
if let Operation::Session(operation) = operation {
return Some(operation);
}
None
})
.for_each(|operation| {
let xid = operation.request.get_xid();
self.written_operations.insert(xid, operation);
});
}
if written_bytes != 0 {
let (_, rest) = self.writing_slices[0].split_at(written_bytes);
Expand Down
Loading

0 comments on commit e4d68f8

Please sign in to comment.