Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement client-side timeouts #329

Closed
wants to merge 11 commits into from
145 changes: 108 additions & 37 deletions scylla/src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,17 @@ pub struct Connection {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the commit message: I believe there are 32k streams available on a connection, not 16k.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but negative ids are reserved for events iirc, so users can de facto use 16k

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait, the pool is 64k and 32k are for the user, you're right

type ResponseHandler = oneshot::Sender<Result<TaskResponse, QueryError>>;

struct Task {
serialized_request: SerializedRequest,
response_handler: ResponseHandler,
enum Task {
// Send a request to the server and return the allocated stream id
Request {
stream_id_sender: oneshot::Sender<i16>,
response_handler: ResponseHandler,
serialized_request: SerializedRequest,
},
// Mark a stream id as orphaned
Orphan {
stream_id: i16,
},
}

struct TaskResponse {
Expand Down Expand Up @@ -635,12 +643,13 @@ impl Connection {
};
let serialized_request = SerializedRequest::make(request, compression, tracing)?;

let (sender, receiver) = oneshot::channel();

let (response_handler, receiver) = oneshot::channel();
let (stream_id_sender, stream_id_receiver) = oneshot::channel();
self.submit_channel
.send(Task {
.send(Task::Request {
stream_id_sender,
response_handler,
serialized_request,
response_handler: sender,
})
.await
.map_err(|_| {
Expand All @@ -649,22 +658,35 @@ impl Connection {
"Connection broken",
)))
})?;

let task_response = tokio::time::timeout(self.config.client_timeout, receiver)
let stream_id = stream_id_receiver
.await
.map_err(|e| {
QueryError::ClientTimeout(format!(
.map_err(|_| QueryError::UnableToAllocStreamId)?;
let received = match tokio::time::timeout(self.config.client_timeout, receiver).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Library user can decide to stop polling future returned by Session::query* method, causing this line to be last line executed. In this case no Task::Orphan message will be send via self.submit_channel.

I think that speculative execution can also cancel futures, so the problem persists even without user intentionally canceling futures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's true, but it's also no worse than what we have now - the connection would eventually go out of stream ids and die. Users should be encouraged to use our timeout instead of tokio timeout. Do you have any suggestions on how to approach this issue?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea that comes to my mind would be to make the router responsible for detecting timeouts and marking stream IDs as orphaned. It would keep a BTreeMap-based queue of stream IDs ordered by their deadlines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm more for a different solution.
The problem in current solution is that we are not able to deliver notification about orphanage to the router, due to future cancellation.

To ensure, that some action is run in an async function, even when a function's future was dropped, one can use a destructor of an object living in the body of this function. I propose introducing such object to the send_request function, to use it for notifying router about orphanage.

Due to destructors not being async, notifying via bounded channel as it was done now is not an option. Luckily, unbounded tokio channels do not require awaiting to send some value, so they can be used in rust's drop implementations.

struct OrphanhoodNotifier {
    disabled: bool, // initialized to false
    stream_id: i16,
    sender: UnboundedSender<i16>,
}

impl OrphanhoodNotifier {
    fn disable(mut self) {
        self.disabled = true;
    }
}

impl Drop for OrphanhoodNotifier {
    fn drop(&mut self) {
        if (!self.disabled) {
            self.sender.send(self.stream_id);
        }
    }
}

I imagine, that the OrphanhoodNotifier would be sent instead of raw stream_id using stream_id_sender (as this would protect us from not notifying about orphanage, when send_request is canceled before receiving stream_id from stream_id_receiver but after it submitted Task::Request). After receival of query response, orphan notification would be disabled by calling .disable() method of OrphanhoodNotifier.

Such design would also require changes in router's way of allocating stream_ids and marking stream_ids as orphaned.
Why? Let's consider a situation, when:

  1. A request is sent by send_request. Let's call this invocation of send_request "the first invocation".
  2. The request sent by the first invocation gets a stream_id = 1.
  3. Connection's router executes this request and, after receiving response, stream_id = 1 goes back to pool.
  4. Next, another request is sent by another invocation of send_request (let's call it "the second invocation").
  5. The second request also gets a stream_id = 1, because it was already available in a pool.
  6. Now, if a late orphanage notification somehow arrives from the first invocation of send_request, it would mark the wrong request (request currently having stream_id = 1 sent by the send_request second invocation).

One way to get rid of this problem, is to track generations of stream_ids.

struct OrphanhoodNotifier {
    disabled: bool, // initialized to false
    stream_id: i16,
    generation: u64, // new field denoting the stream_id's generation number
    sender: UnboundedSender<i16>,
}

On the router side, an associative array containing mapping stream_id -> current generation number would be kept. When a new stream_id is allocated, it's generation number inside the associative array is incremented. If a orphanhood notification is received, there should be check if the generations match (the one in notification and other in associative array). After check succeeds, stream_id can be considered as orphaned. If the check does not succeed, we can be sure, that the router already received the response, and the stream_id was reused.

I believe this design is better than keeping BTreeMap queue of stream IDs, because it allows to handle timeouts by future cancellation (and therefore allows us to implement timeouts without knowing the timeout duration in advance). Being able to handle cancellations is crucial as the implementation of speculative execution heavily depends on them (by using select!).

Err(e) => {
self.submit_channel
.send(Task::Orphan { stream_id })
.await
.map_err(|_| {
QueryError::IoError(Arc::new(std::io::Error::new(
ErrorKind::Other,
"Failed to orphan a stream id",
)))
})?;
return Err(QueryError::ClientTimeout(format!(
"Request took longer than {}ms: {}",
self.config.client_timeout.as_millis(),
e
))
})?
.map_err(|_| {
QueryError::IoError(Arc::new(std::io::Error::new(
ErrorKind::Other,
"Connection broken",
)))
})??;
)));
}
Ok(resp) => resp,
};

let task_response = received.map_err(|_| {
QueryError::IoError(Arc::new(std::io::Error::new(
ErrorKind::Other,
"Connection broken",
)))
})??;

Self::parse_response(task_response, self.config.compression)
}
Expand Down Expand Up @@ -762,7 +784,6 @@ impl Connection {
let w = Self::writer(write_half, &handler_map, receiver);

let result = futures::try_join!(r, w);

let error: QueryError = match result {
Ok(_) => return, // Connection was dropped, we can return
Err(err) => err,
Expand Down Expand Up @@ -841,24 +862,43 @@ impl Connection {
// of the channel will be dropped, this task will return an error
// and the whole worker will be stopped
while let Some(task) = task_receiver.recv().await {
let stream_id = {
// We are guaranteed here that handler_map will not be locked
// by anybody else, so we can do try_lock().unwrap()
let mut lock = handler_map.try_lock().unwrap();

if let Some(stream_id) = lock.allocate(task.response_handler) {
stream_id
} else {
// TODO: Handle this error better, for now we drop this
// request and return an error to the receiver
error!("Could not allocate stream id");
continue;
match task {
Task::Request {
stream_id_sender,
response_handler,
serialized_request,
} => {
let stream_id = {
let mut hmap = handler_map.try_lock().unwrap();
// TODO: Handle stream id allocation error better,
// for now the error is propagated to the connection
if let Some(id) = hmap.allocate(response_handler) {
stream_id_sender
.send(id)
.map_err(|_| QueryError::UnableToAllocStreamId)?;
id
} else {
error!("Unable to allocate stream id");
continue;
}
};
let mut req = serialized_request;
req.set_stream(stream_id);
write_half.write_all(req.get_data()).await?;
}
};

let mut req = task.serialized_request;
req.set_stream(stream_id);
write_half.write_all(req.get_data()).await?;
Task::Orphan { stream_id } => {
let mut hmap = handler_map.try_lock().unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we probably can remove the entry from handler_map corresponding to the orphaned stream ID.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? The id is orphaned, but a response to it can still arrive, so once it does, it will remove the entry as well. And if it doesn't and eventually we hit max orphan count, the connection dies anway. Or do you mean that only as an optimization?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mainly meant it as an optimization - the memory used by the entry can be freed at this point because the contents of the entry won't be used (it's a Sender of a oneshot channel which won't be read from).

if hmap.orphan_count() >= 1024 {
//FIXME: make the orphan threshold configurable
warn!(
"Too many orphaned stream ids: {}, dropping connection",
hmap.orphan_count()
);
return Err(QueryError::TooManyOrphanedStreamIds(hmap.orphan_count()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried that immediately closing a connection in case of too many orphaned IDs may unnecessarily cause availability issues. What if all connections start timing out requests at the same rate and we decide to close all of them for that reason at the same time?

}
hmap.orphan(stream_id)
}
}
}

Ok(())
Expand Down Expand Up @@ -1074,13 +1114,17 @@ async fn connect_with_source_port(
struct ResponseHandlerMap {
stream_set: StreamIdSet,
handlers: HashMap<i16, ResponseHandler>,
orphan_count: u16,
orphanage: StreamIdSet,
}

impl ResponseHandlerMap {
pub fn new() -> Self {
Self {
stream_set: StreamIdSet::new(),
handlers: HashMap::new(),
orphan_count: 0,
orphanage: StreamIdSet::new(),
}
}

Expand All @@ -1093,9 +1137,24 @@ impl ResponseHandlerMap {

pub fn take(&mut self, stream_id: i16) -> Option<ResponseHandler> {
self.stream_set.free(stream_id);
if self.orphanage.contains(stream_id) {
self.orphanage.free(stream_id);
self.orphan_count -= 1;
}
self.handlers.remove(&stream_id)
}

pub fn orphan(&mut self, stream_id: i16) {
if self.stream_set.contains(stream_id) && !self.orphanage.contains(stream_id) {
self.orphan_count += 1;
self.orphanage.insert(stream_id)
}
}

pub fn orphan_count(&self) -> u16 {
self.orphan_count
}

// Retrieves the map of handlers, used after connection breaks
// and we have to respond to all of them with an error
pub fn into_handlers(self) -> HashMap<i16, ResponseHandler> {
Expand Down Expand Up @@ -1132,6 +1191,18 @@ impl StreamIdSet {
let off = stream_id as usize % 64;
self.used_bitmap[block_id] &= !(1 << off);
}

pub fn contains(&self, stream_id: i16) -> bool {
let block_id = stream_id as usize / 64;
let off = stream_id as usize % 64;
self.used_bitmap[block_id] & (1 << off) != 0
}

pub fn insert(&mut self, stream_id: i16) {
let block_id = stream_id as usize / 64;
let off = stream_id as usize % 64;
self.used_bitmap[block_id] |= 1 << off;
}
}

/// This type can only hold a valid keyspace name
Expand Down