Skip to content

Commit

Permalink
use 0.4.0 of fluvios-socket in the client (#458)
Browse files Browse the repository at this point in the history
* use 0.4.0 of fluvios-socket in the client
* apply cargo fmt and clippy
* make wait time for dispatcher wait time configurable
* force resync service controller every 60 seconds
* update fluvio cli
* add macos back to ci
  • Loading branch information
sehz authored Nov 11, 2020
1 parent 298b5df commit 42b975d
Show file tree
Hide file tree
Showing 15 changed files with 110 additions and 67 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest]
os: [ubuntu-latest,macOS-latest]
rust: [stable]

steps:
Expand Down
49 changes: 38 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ k8-config = { version = "1.3.0", features = ["context"] }
k8-obj-core = { version = "1.1.0" }
k8-obj-metadata = { version = "1.0.0" }
k8-metadata-client = { version = "1.0.0" }
fluvio = { version = "0.2.2", path = "../client", default-features = false }
fluvio-sc = { version = "0.3.0", path = "../sc", optional = true, default-features = false }
fluvio = { version = "0.2.3", path = "../client", default-features = false }
fluvio-sc = { version = "0.3.1", path = "../sc", optional = true, default-features = false }
fluvio-sc-schema = { version = "0.2.0", path = "../sc-schema", features = ["use_serde"] }
fluvio-spu = { version = "0.2.0", path = "../spu", optional = true }
fluvio-controlplane-metadata = { version = "0.2.0", path = "../controlplane-metadata", features = ["use_serde", "k8"] }
Expand Down
4 changes: 2 additions & 2 deletions src/client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio"
version = "0.2.2"
version = "0.2.3"
edition = "2018"
license = "Apache-2.0"
authors = ["Fluvio Contributors <[email protected]>"]
Expand Down Expand Up @@ -39,7 +39,7 @@ fluvio-future = { version = "0.1.4", default-features = false }
fluvio-types = { version = "0.1.0", path = "../types" }
fluvio-sc-schema = { version = "0.2.0", path = "../sc-schema", default-features = false }
fluvio-spu-schema = { version = "0.1.0", path = "../spu-schema" }
fluvio-socket = { version = "0.3.0" }
fluvio-socket = { version = "0.4.0" }
fluvio-protocol = { version = "0.2.0" }
dataplane = { version = "0.1.0", path = "../dataplane-protocol", package = "fluvio-dataplane-protocol" }

Expand Down
3 changes: 1 addition & 2 deletions src/client/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ impl FluvioAdmin {
debug!("connected to cluster at: {}", inner_client.config().addr());

let (socket, config, versions) = inner_client.split();
let socket = AllMultiplexerSocket::new(socket);
let socket = socket.create_serial_socket().await;
let socket = AllMultiplexerSocket::shared(socket);

let versioned_socket = VersionedSerialSocket::new(socket, config, versions);
Ok(Self(versioned_socket))
Expand Down
10 changes: 7 additions & 3 deletions src/client/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use dataplane::api::RequestMessage;
use dataplane::api::Request;
use fluvio_spu_schema::server::versions::{ApiVersions, ApiVersionsRequest};
use fluvio_socket::FlvSocketError;
use fluvio_socket::{AllFlvSocket, AllSerialSocket};
use fluvio_socket::{AllFlvSocket, SharedAllMultiplexerSocket};

#[cfg(feature = "rust_tls")]
use fluvio_future::tls::AllDomainConnector;
Expand Down Expand Up @@ -205,7 +205,7 @@ impl Versions {

/// Connection that perform request/response
pub struct VersionedSerialSocket {
socket: AllSerialSocket,
socket: SharedAllMultiplexerSocket,
config: ClientConfig,
versions: Versions,
}
Expand All @@ -217,7 +217,11 @@ impl fmt::Display for VersionedSerialSocket {
}

impl VersionedSerialSocket {
pub fn new(socket: AllSerialSocket, config: ClientConfig, versions: Versions) -> Self {
pub fn new(
socket: SharedAllMultiplexerSocket,
config: ClientConfig,
versions: Versions,
) -> Self {
Self {
socket,
config,
Expand Down
10 changes: 5 additions & 5 deletions src/client/src/client/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::convert::TryFrom;

use tracing::{debug, trace};

use fluvio_socket::AllMultiplexerSocket;
use fluvio_socket::{AllMultiplexerSocket, SharedAllMultiplexerSocket};

#[cfg(feature = "native_tls")]
use fluvio_future::native_tls::AllDomainConnector;
Expand All @@ -23,7 +23,7 @@ use super::*;

/// An interface for interacting with Fluvio streaming
pub struct Fluvio {
socket: AllMultiplexerSocket,
socket: SharedAllMultiplexerSocket,
config: ClientConfig,
versions: Versions,
spu_pool: SpuPool,
Expand Down Expand Up @@ -81,9 +81,9 @@ impl Fluvio {
debug!("connected to cluster at: {}", inner_client.config().addr());

let (socket, config, versions) = inner_client.split();
let mut socket = AllMultiplexerSocket::new(socket);
let socket = AllMultiplexerSocket::shared(socket);

let metadata = MetadataStores::new(&mut socket).await?;
let metadata = MetadataStores::new(&socket).await?;
let spu_pool = SpuPool::new(config.clone(), metadata);

Ok(Self {
Expand Down Expand Up @@ -172,7 +172,7 @@ impl Fluvio {
/// create serial connection
async fn create_serial_client(&self) -> VersionedSerialSocket {
VersionedSerialSocket::new(
self.socket.create_serial_socket().await,
self.socket.clone(),
self.config.clone(),
self.versions.clone(),
)
Expand Down
8 changes: 4 additions & 4 deletions src/client/src/spu/spu_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use dataplane::ReplicaKey;
use dataplane::api::Request;
use dataplane::api::RequestMessage;
use fluvio_types::SpuId;
use fluvio_socket::AllMultiplexerSocket;
use fluvio_socket::{AllMultiplexerSocket, SharedAllMultiplexerSocket};
use fluvio_socket::AsyncResponse;
use crate::FluvioError;
use crate::client::ClientConfig;
Expand All @@ -20,14 +20,14 @@ const DEFAULT_STREAM_QUEUE_SIZE: usize = 10;

struct SpuSocket {
config: ClientConfig,
socket: AllMultiplexerSocket,
socket: SharedAllMultiplexerSocket,
versions: Versions,
}

impl SpuSocket {
async fn create_serial_socket(&mut self) -> VersionedSerialSocket {
VersionedSerialSocket::new(
self.socket.create_serial_socket().await,
self.socket.clone(),
self.config.clone(),
self.versions.clone(),
)
Expand Down Expand Up @@ -75,7 +75,7 @@ impl SpuPool {
let versioned_socket = client_config.connect().await?;
let (socket, config, versions) = versioned_socket.split();
Ok(SpuSocket {
socket: AllMultiplexerSocket::new(socket),
socket: AllMultiplexerSocket::shared(socket),
config,
versions,
})
Expand Down
12 changes: 7 additions & 5 deletions src/client/src/sync/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ where
async fn dispatch_loop(mut self, mut response: AsyncResponse<WatchRequest>) {
use tokio::select;

debug!("starting dispatch loop");
debug!("{} starting dispatch loop", S::LABEL);

loop {
// check if shutdown is set
if self.shutdown.is_set() {
debug!("{} shutdown exiting", S::LABEL);
break;
}

Expand All @@ -102,7 +103,7 @@ where
}

item = response.next() => {
debug!("received request");
debug!("{} received request",S::LABEL);

match item {
Some(Ok(watch_response)) => {
Expand All @@ -120,18 +121,19 @@ where
}
},
Some(Err(err)) => {
error!("error receiving, end, {}", err);
error!("{} error receiving, end, {}", S::LABEL, err);
break;
},
None => {
error!("No more items to receive from stream!")
debug!("{} No more items to receive from stream!",S::LABEL);
break;
}
}
}
}
}

debug!("shutting down");
debug!("{} terminated", S::LABEL);
}

async fn process_updates(&mut self, updates: MetadataUpdate<S>) -> Result<(), IoError> {
Expand Down
6 changes: 3 additions & 3 deletions src/client/src/sync/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct MetadataStores {

impl MetadataStores {
/// crete store and set up sync controllers
pub async fn new(socket: &mut AllMultiplexerSocket) -> Result<Self, FlvSocketError> {
pub async fn new(socket: &AllMultiplexerSocket) -> Result<Self, FlvSocketError> {
let store = Self {
shutdown: SimpleEvent::shared(),
spus: StoreContext::new(),
Expand Down Expand Up @@ -49,7 +49,7 @@ impl MetadataStores {
/// start watch for spu
pub async fn start_watch_for_spu(
&self,
socket: &mut AllMultiplexerSocket,
socket: &AllMultiplexerSocket,
) -> Result<(), FlvSocketError> {
use dataplane::api::RequestMessage;
use fluvio_sc_schema::objects::WatchRequest;
Expand All @@ -70,7 +70,7 @@ impl MetadataStores {

pub async fn start_watch_for_partition(
&self,
socket: &mut AllMultiplexerSocket,
socket: &AllMultiplexerSocket,
) -> Result<(), FlvSocketError> {
use dataplane::api::RequestMessage;
use fluvio_sc_schema::objects::WatchRequest;
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async-trait = "0.1.21"


# Fluvio dependencies
fluvio = { version = "0.2.0", path = "../client", default-features = false }
fluvio = { version = "0.2.3", path = "../client", default-features = false }
fluvio-controlplane-metadata = { version = "0.2.0", path = "../controlplane-metadata", features = ["k8"] }
fluvio-future = { version = "0.1.0" }
flv-util = "0.5.2"
Expand Down
2 changes: 1 addition & 1 deletion src/sc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fluvio-sc"
edition = "2018"
version = "0.3.0"
version = "0.3.1"
authors = ["fluvio.io"]
description = "Fluvio Stream Controller"
repository = "https://github.com/infinyon/fluvio"
Expand Down
Loading

0 comments on commit 42b975d

Please sign in to comment.