Skip to content

Commit

Permalink
introduce SpuDirectory trait for Fluvio Client API (#1863)
Browse files Browse the repository at this point in the history
resolves #1862
  • Loading branch information
sehz committed Nov 7, 2021
1 parent f65b472 commit a739ea6
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Add `#[smartstream(filter_map)]` for filtering and transforming at the same time. ([#1826](https://github.com/infinyon/fluvio/issues/1826))
* Add table display output option to consumer for json objects ([#1642](https://github.com/infinyon/fluvio/issues/1642))
* Streamlined Admin API ([#1803](https://github.com/infinyon/fluvio/issues/1803))
* Add SpuDirectory trait to Fluvio Client ([#1863](https://github.com/infinyon/fluvio/issues/1863))

## Platform Version 0.9.12 - 2021-10-27
* Add examples for ArrayMap. ([#1804](https://github.com/infinyon/fluvio/issues/1804))
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pin-project-lite = "0.2"
siphasher = "0.3.5"
cfg-if = "1.0.0"
derive_builder = "0.10"
async-trait = "0.1.51"

# Fluvio dependencies
fluvio-future = { version = "0.3.5", features = ["task", "openssl_tls", "task_unstable"] }
Expand All @@ -66,6 +67,5 @@ async-std = { version = "1.6.4", default-features = false }
fluvio-future = { version = "0.3.0", features = ["fixture"] }

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
async-trait = "0.1.50"
wasm-bindgen-test = "0.3.24"
fluvio_ws_stream_wasm = "0.7.0"
13 changes: 8 additions & 5 deletions crates/fluvio/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use fluvio_types::event::offsets::OffsetPublisher;

use crate::FluvioError;
use crate::offset::{Offset, fetch_offsets};
use crate::spu::SpuPool;
use crate::spu::{SpuDirectory, SpuPool};
use derive_builder::Builder;

/// An interface for consuming events from a particular partition
Expand Down Expand Up @@ -60,14 +60,17 @@ use derive_builder::Builder;
/// [`Offset`]: struct.Offset.html
/// [`partition_consumer`]: struct.Fluvio.html#method.partition_consumer
/// [`Fluvio`]: struct.Fluvio.html
pub struct PartitionConsumer {
pub struct PartitionConsumer<P = SpuPool> {
topic: String,
partition: i32,
pool: Arc<SpuPool>,
pool: Arc<P>,
}

impl PartitionConsumer {
pub(crate) fn new(topic: String, partition: i32, pool: Arc<SpuPool>) -> Self {
impl<P> PartitionConsumer<P>
where
P: SpuDirectory,
{
pub(crate) fn new(topic: String, partition: i32, pool: Arc<P>) -> Self {
Self {
topic,
partition,
Expand Down
114 changes: 73 additions & 41 deletions crates/fluvio/src/spu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashMap;

use tracing::{debug, trace, instrument};
use async_lock::Mutex;
use async_trait::async_trait;

use dataplane::ReplicaKey;
use dataplane::api::Request;
Expand All @@ -17,6 +18,31 @@ use crate::sockets::Versions;

const DEFAULT_STREAM_QUEUE_SIZE: usize = 10;

/// used for connectiong to spu
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait SpuDirectory {
/// Create request/response socket to SPU for a replica
///
/// All sockets to same SPU use a single TCP connection.
/// First this looks up SPU address in SPU metadata and try to see if there is an existing TCP connection.
/// If not, it will create a new connection and creates socket to it
async fn create_serial_socket(
&self,
replica: &ReplicaKey,
) -> Result<VersionedSerialSocket, FluvioError>;

/// create stream to leader replica
async fn create_stream_with_version<R: Request>(
&self,
replica: &ReplicaKey,
request: R,
version: i16,
) -> Result<AsyncResponse<R>, FluvioError>
where
R: Sync + Send;
}

struct SpuSocket {
config: Arc<ClientConfig>,
socket: SharedMultiplexerSocket,
Expand Down Expand Up @@ -103,13 +129,54 @@ impl SpuPool {
})
}

#[instrument(skip(self))]
pub async fn create_serial_socket_from_leader(
&self,
leader_id: SpuId,
) -> Result<VersionedSerialSocket, FluvioError> {
// check if already have existing connection to same SPU
let mut client_lock = self.spu_clients.lock().await;

if let Some(spu_socket) = client_lock.get_mut(&leader_id) {
if !spu_socket.is_stale() {
return Ok(spu_socket.create_serial_socket().await);
} else {
client_lock.remove(&leader_id);
}
}

let mut spu_socket = self.connect_to_leader(leader_id).await?;
let serial_socket = spu_socket.create_serial_socket().await;
client_lock.insert(leader_id, spu_socket);

Ok(serial_socket)
}

pub async fn topic_exists<S: Into<String>>(&self, topic: S) -> Result<bool, FluvioError> {
let replica = ReplicaKey::new(topic, 0);
Ok(self
.metadata
.partitions()
.lookup_by_key(&replica)
.await?
.is_some())
}

pub fn shutdown(&mut self) {
self.metadata.shutdown();
}
}

#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl SpuDirectory for SpuPool {
/// Create request/response socket to SPU for a replica
///
/// All sockets to same SPU use a single TCP connection.
/// First this looks up SPU address in SPU metadata and try to see if there is an existing TCP connection.
/// If not, it will create a new connection and creates socket to it
#[instrument(skip(self, replica))]
pub async fn create_serial_socket(
async fn create_serial_socket(
&self,
replica: &ReplicaKey,
) -> Result<VersionedSerialSocket, FluvioError> {
Expand All @@ -128,37 +195,16 @@ impl SpuPool {
Ok(socket)
}

#[instrument(skip(self))]
pub async fn create_serial_socket_from_leader(
&self,
leader_id: SpuId,
) -> Result<VersionedSerialSocket, FluvioError> {
// check if already have existing connection to same SPU
let mut client_lock = self.spu_clients.lock().await;

if let Some(spu_socket) = client_lock.get_mut(&leader_id) {
if !spu_socket.is_stale() {
return Ok(spu_socket.create_serial_socket().await);
} else {
client_lock.remove(&leader_id);
}
}

let mut spu_socket = self.connect_to_leader(leader_id).await?;
let serial_socket = spu_socket.create_serial_socket().await;
client_lock.insert(leader_id, spu_socket);

Ok(serial_socket)
}

/// create stream to leader replica
#[instrument(skip(self, replica, request, version))]
pub async fn create_stream_with_version<R: Request>(
async fn create_stream_with_version<R: Request>(
&self,
replica: &ReplicaKey,
request: R,
version: i16,
) -> Result<AsyncResponse<R>, FluvioError> {
) -> Result<AsyncResponse<R>, FluvioError>
where
R: Sync + Send,
{
let partition_search = self.metadata.partitions().lookup_by_key(replica).await?;

let partition = if let Some(partition) = partition_search {
Expand Down Expand Up @@ -189,18 +235,4 @@ impl SpuPool {

Ok(stream)
}

pub async fn topic_exists<S: Into<String>>(&self, topic: S) -> Result<bool, FluvioError> {
let replica = ReplicaKey::new(topic, 0);
Ok(self
.metadata
.partitions()
.lookup_by_key(&replica)
.await?
.is_some())
}

pub fn shutdown(&mut self) {
self.metadata.shutdown();
}
}

0 comments on commit a739ea6

Please sign in to comment.