Skip to content

Commit

Permalink
perf: Tokio compat (#1922)
Browse files Browse the repository at this point in the history
* Added tokio 0.2 and tokio-compat

Signed-off-by: MOZGIII <[email protected]>

* Disable file sink and files bench

Signed-off-by: MOZGIII <[email protected]>

* Upgrade tokio-threadpool with tokio02::task::block_in_place

Signed-off-by: MOZGIII <[email protected]>

* Add crate::test_util::trace_init at sinks::statsd::test::test_send_to_statsd

Signed-off-by: MOZGIII <[email protected]>

* Upgrade tokio-compat to a fixed version

Signed-off-by: MOZGIII <[email protected]>

* Switch to released tokio-compat 0.1.5

Signed-off-by: MOZGIII <[email protected]>
  • Loading branch information
MOZGIII authored Mar 18, 2020
1 parent 91ab532 commit ccd3cf7
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 71 deletions.
140 changes: 93 additions & 47 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ tracing-limit = { path = "lib/tracing-limit" }
futures01 = { package = "futures", version = "0.1.25" }
futures = { version = "0.3", default-features = false, features = ["compat"] }
tokio = { version = "0.1.22", features = ["io", "uds", "tcp", "rt-full", "experimental-tracing"], default-features = false }
tokio02 = { package = "tokio", version = "0.2", features = ["blocking"] }
tokio-codec = "0.1.0"
tokio-openssl = "0.3.0"
tokio-retry = "0.2.0"
tokio-signal = "0.2.7"
tokio-threadpool = "0.1.16"
tokio-compat = { version = "0.1", features = ["rt-full"] }

# Tracing
tracing = "0.1.9"
Expand Down
2 changes: 1 addition & 1 deletion benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ criterion_main!(
buffering::buffers,
http::http,
batch::batch,
files::files,
/* files::files, */
lua::lua,
event::event
);
Expand Down
2 changes: 2 additions & 0 deletions benches/files.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg(feature = "disabled")]

use bytes::Bytes;
use criterion::{criterion_group, Benchmark, Criterion, Throughput};
use futures01::{sink::Sink, stream::Stream, Future};
Expand Down
7 changes: 3 additions & 4 deletions src/buffers/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,13 @@ impl Stream for Reader {

// This will usually complete instantly, but in the case of a large queue (or a fresh launch of
// the app), this will have to go to disk.
let next = tokio_threadpool::blocking(|| {
let next = tokio02::task::block_in_place(|| {
self.db
.get(ReadOptions::new(), Key(self.read_offset))
.unwrap()
})
.unwrap();
});

if let Async::Ready(Some(value)) = next {
if let Some(value) = next {
self.unacked_sizes.push_back(value.len());
self.read_offset += 1;

Expand Down
8 changes: 4 additions & 4 deletions src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use futures01::future::{ExecuteError, Executor, Future};
use std::io;
use std::pin::Pin;
use tokio::runtime::Builder;
use tokio_compat::runtime::{Builder, Runtime as TokioRuntime, TaskExecutor as TokioTaskExecutor};

pub struct Runtime {
rt: tokio::runtime::Runtime,
rt: TokioRuntime,
}

impl Runtime {
pub fn new() -> io::Result<Self> {
Ok(Runtime {
rt: tokio::runtime::Runtime::new()?,
rt: TokioRuntime::new()?,
})
}

Expand Down Expand Up @@ -70,7 +70,7 @@ impl Runtime {

#[derive(Clone, Debug)]
pub struct TaskExecutor {
inner: tokio::runtime::TaskExecutor,
inner: TokioTaskExecutor,
}

impl TaskExecutor {
Expand Down
2 changes: 2 additions & 0 deletions src/sinks/file/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg(feature = "disabled")]

mod file;

use self::file::File;
Expand Down
22 changes: 8 additions & 14 deletions src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ use crate::{
};
use futures::compat::Compat;
use futures01::{
future::{self, poll_fn, IntoFuture},
stream::FuturesUnordered,
Async, AsyncSink, Future, Poll, Sink, StartSend, Stream,
future, stream::FuturesUnordered, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream,
};
use rdkafka::{
consumer::{BaseConsumer, Consumer},
Expand Down Expand Up @@ -214,18 +212,14 @@ impl Sink for KafkaSink {
fn healthcheck(config: KafkaSinkConfig) -> super::Healthcheck {
let consumer: BaseConsumer = config.to_rdkafka().unwrap().create().unwrap();

let check = poll_fn(move || {
tokio_threadpool::blocking(|| {
consumer
.fetch_metadata(Some(&config.topic), Duration::from_secs(3))
.map(|_| ())
.map_err(|err| err.into())
})
})
.map_err(|err| err.into())
.and_then(|result| result.into_future());
let check = tokio02::task::block_in_place(|| {
consumer
.fetch_metadata(Some(&config.topic), Duration::from_secs(3))
.map(|_| ())
.map_err(|err| err.into())
});

Box::new(check)
Box::new(future::result(check))
}

fn encode_event(
Expand Down
2 changes: 2 additions & 0 deletions src/sinks/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ mod test {

#[test]
fn test_send_to_statsd() {
crate::test_util::trace_init();

let config = StatsdSinkConfig {
namespace: "vector".into(),
address: default_address(),
Expand Down

0 comments on commit ccd3cf7

Please sign in to comment.