Skip to content

Commit

Permalink
chore: Rename futures to futures01 (#1933)
Browse files Browse the repository at this point in the history
* Rename futures to futures01

This makes it easier for the whole team to spot old futures use and fix it
in their local piece of work.

Signed-off-by: MOZGIII <[email protected]>

* Remove alternative name for futures 0.3, making it the default

Signed-off-by: MOZGIII <[email protected]>
  • Loading branch information
MOZGIII authored Feb 26, 2020
1 parent 271bcbd commit 0b92159
Show file tree
Hide file tree
Showing 87 changed files with 231 additions and 229 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ tracing-metrics = { path = "lib/tracing-metrics" }
tracing-limit = { path = "lib/tracing-limit" }

# Tokio / Futures
futures = "0.1.25"
futures03 = { package = "futures", version = "0.3", default-features = false, features = ["compat"] }
futures01 = { package = "futures", version = "0.1.25" }
futures = { version = "0.3", default-features = false, features = ["compat"] }
tokio = { version = "0.1.22", features = ["io", "uds", "tcp", "rt-full", "experimental-tracing"], default-features = false }
tokio-retry = "0.2.0"
tokio-signal = "0.2.7"
Expand Down
8 changes: 4 additions & 4 deletions benches/batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::Bytes;
use criterion::{criterion_group, Benchmark, Criterion, Throughput};
use futures::sync::mpsc;
use futures::{Future, Sink, Stream};
use futures01::sync::mpsc;
use futures01::{Future, Sink, Stream};
use vector::sinks::util::{Batch, BatchSink, Buffer, Partition, PartitionedBatchSink};
use vector::test_util::random_lines;

Expand All @@ -19,7 +19,7 @@ fn batching(
.take(num_events)
.map(|s| s.into_bytes())
.collect::<Vec<_>>();
futures::stream::iter_ok::<_, ()>(input.into_iter())
futures01::stream::iter_ok::<_, ()>(input.into_iter())
},
|input| {
let (tx, _rx) = mpsc::unbounded();
Expand Down Expand Up @@ -54,7 +54,7 @@ fn partitioned_batching(
key: key.clone(),
})
.collect::<Vec<_>>();
futures::stream::iter_ok::<_, ()>(input.into_iter())
futures01::stream::iter_ok::<_, ()>(input.into_iter())
},
|input| {
let (tx, _rx) = mpsc::unbounded();
Expand Down
8 changes: 4 additions & 4 deletions benches/bench.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use criterion::{criterion_group, criterion_main, Benchmark, Criterion, Throughput};

use approx::assert_relative_eq;
use futures::future;
use futures01::future;
use rand::distributions::{Alphanumeric, Uniform};
use rand::prelude::*;
use vector::event::Event;
Expand Down Expand Up @@ -233,7 +233,7 @@ fn benchmark_simple_pipe_with_many_writers(c: &mut Criterion) {
let sends = (0..num_writers)
.map(|_| {
let send = send_lines(in_addr, random_lines(line_size).take(num_lines));
futures::sync::oneshot::spawn(send, &rt.executor())
futures01::sync::oneshot::spawn(send, &rt.executor())
})
.collect::<Vec<_>>();

Expand Down Expand Up @@ -567,7 +567,7 @@ fn benchmark_complex(c: &mut Criterion) {
)| {
// One sender generates pure random lines
let send1 = send_lines(in_addr1, random_lines(100).take(num_lines));
let send1 = futures::sync::oneshot::spawn(send1, &rt.executor());
let send1 = futures01::sync::oneshot::spawn(send1, &rt.executor());

// The other includes either status=200 or status=404
let mut rng = SmallRng::from_rng(thread_rng()).unwrap();
Expand All @@ -582,7 +582,7 @@ fn benchmark_complex(c: &mut Criterion) {
})
.take(num_lines),
);
let send2 = futures::sync::oneshot::spawn(send2, &rt.executor());
let send2 = futures01::sync::oneshot::spawn(send2, &rt.executor());
let sends = vec![send1, send2];
rt.block_on(future::join_all(sends)).unwrap();

Expand Down
4 changes: 2 additions & 2 deletions benches/files.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use bytes::Bytes;
use criterion::{criterion_group, Benchmark, Criterion, Throughput};
use futures::{sink::Sink, stream::Stream, Future};
use futures01::{sink::Sink, stream::Stream, Future};
use std::path::PathBuf;
use tempfile::tempdir;
use tokio::codec::{BytesCodec, FramedWrite};
Expand Down Expand Up @@ -72,7 +72,7 @@ fn benchmark_files_without_partitions(c: &mut Criterion) {
Bytes::from(line)
});

let lines = futures::stream::iter_ok::<_, ()>(lines);
let lines = futures01::stream::iter_ok::<_, ()>(lines);

let pump = lines.forward(input);
let (_, _) = rt.block_on(pump).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion benches/http.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use criterion::{criterion_group, Benchmark, Criterion, Throughput};
use futures::Future;
use futures01::Future;
use hyper::service::service_fn_ok;
use hyper::{Body, Response, Server};
use std::net::SocketAddr;
Expand Down
2 changes: 1 addition & 1 deletion src/buffers/disk.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![cfg(feature = "leveldb")]

use crate::event::{proto, Event};
use futures::{
use futures01::{
task::{self, AtomicTask, Task},
Async, AsyncSink, Poll, Sink, Stream,
};
Expand Down
4 changes: 2 additions & 2 deletions src/buffers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::Event;
use futures::{sync::mpsc, task::AtomicTask, AsyncSink, Poll, Sink, StartSend, Stream};
use futures01::{sync::mpsc, task::AtomicTask, AsyncSink, Poll, Sink, StartSend, Stream};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::{
Expand Down Expand Up @@ -189,7 +189,7 @@ impl<S: Sink> Sink for DropWhenFull<S> {
mod test {
use super::{Acker, DropWhenFull};
use crate::test_util::block_on;
use futures::{future, sync::mpsc, task::AtomicTask, Async, AsyncSink, Sink, Stream};
use futures01::{future, sync::mpsc, task::AtomicTask, Async, AsyncSink, Sink, Stream};
use std::sync::{atomic::AtomicUsize, Arc};
use tokio01_test::task::MockTask;

Expand Down
2 changes: 1 addition & 1 deletion src/dns.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::runtime::TaskExecutor;
use futures::{future, Future};
use futures01::{future, Future};
use hyper::client::connect::dns::{Name, Resolve};
use snafu::{futures01::FutureExt, ResultExt};
use std::{
Expand Down
6 changes: 3 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[macro_use]
extern crate tracing;

use futures::{future, Future, Stream};
use futures01::{future, Future, Stream};
use std::{
cmp::{max, min},
fs::File,
Expand Down Expand Up @@ -353,7 +353,7 @@ fn main() {
};

if signal == SIGINT || signal == SIGTERM {
use futures::future::Either;
use futures01::future::Either;

info!("Shutting down.");
let shutdown = topology.stop();
Expand Down Expand Up @@ -384,7 +384,7 @@ fn main() {
.map_err(|_| ())
.expect("Neither stream errors");

use futures::future::Either;
use futures01::future::Either;

let ctrl_c = match interruption {
Either::A(((_, ctrl_c_stream), _)) => ctrl_c_stream.into_future(),
Expand Down
2 changes: 1 addition & 1 deletion src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::Future;
use futures01::Future;
use hotmic::{
snapshot::{Snapshot, TypedMeasurement},
Controller, Receiver,
Expand Down
14 changes: 7 additions & 7 deletions src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::future::{ExecuteError, Executor, Future};
use futures01::future::{ExecuteError, Executor, Future};
use std::io;
use std::pin::Pin;
use tokio::runtime::Builder;
Expand Down Expand Up @@ -52,7 +52,7 @@ impl Runtime {
F: std::future::Future + Send + 'static,
F::Output: Send + 'static,
{
use futures03::future::{FutureExt, TryFutureExt};
use futures::future::{FutureExt, TryFutureExt};

self.rt
.block_on(future.unit_error().boxed().compat())
Expand All @@ -79,7 +79,7 @@ impl TaskExecutor {
}

pub fn spawn_std(&self, f: impl std::future::Future<Output = ()> + Send + 'static) {
use futures03::future::{FutureExt, TryFutureExt};
use futures::future::{FutureExt, TryFutureExt};

self.spawn(f.unit_error().boxed().compat());
}
Expand All @@ -94,15 +94,15 @@ where
}
}

pub trait FutureExt: futures03::TryFuture {
pub trait FutureExt: futures::TryFuture {
/// Used to compat a `!Unpin` type from 0.3 futures to 0.1
fn boxed_compat(self) -> futures03::compat::Compat<Pin<Box<Self>>>
fn boxed_compat(self) -> futures::compat::Compat<Pin<Box<Self>>>
where
Self: Sized,
{
let fut = Box::pin(self);
futures03::compat::Compat::new(fut)
futures::compat::Compat::new(fut)
}
}

impl<T: futures03::TryFuture> FutureExt for T {}
impl<T: futures::TryFuture> FutureExt for T {}
4 changes: 2 additions & 2 deletions src/sinks/aws_cloudwatch_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
topology::config::{DataType, SinkConfig, SinkContext},
};
use bytes::Bytes;
use futures::{future, stream::iter_ok, sync::oneshot, Async, Future, Poll, Sink};
use futures01::{future, stream::iter_ok, sync::oneshot, Async, Future, Poll, Sink};
use lazy_static::lazy_static;
use rusoto_core::{request::BufferedHttpResponse, Region, RusotoError};
use rusoto_logs::{
Expand Down Expand Up @@ -734,7 +734,7 @@ mod integration_tests {
test_util::{random_lines_with_stream, random_string},
topology::config::{SinkConfig, SinkContext},
};
use futures::Sink;
use futures01::Sink;
use pretty_assertions::assert_eq;
use rusoto_core::Region;
use rusoto_logs::{CloudWatchLogs, CreateLogGroupRequest, GetLogEventsRequest};
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_cloudwatch_logs/request.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::CloudwatchError;
use futures::{sync::oneshot, try_ready, Async, Future, Poll};
use futures01::{sync::oneshot, try_ready, Async, Future, Poll};
use rusoto_core::{RusotoError, RusotoFuture};
use rusoto_logs::{
CloudWatchLogs, CloudWatchLogsClient, CreateLogGroupError, CreateLogGroupRequest,
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/aws_cloudwatch_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
};
use chrono::{DateTime, SecondsFormat, Utc};
use futures::{Future, Poll};
use futures01::{Future, Poll};
use lazy_static::lazy_static;
use rusoto_cloudwatch::{
CloudWatch, CloudWatchClient, Dimension, MetricDatum, PutMetricDataError, PutMetricDataInput,
Expand Down Expand Up @@ -432,7 +432,7 @@ mod integration_tests {
use crate::test_util::{random_string, runtime};
use crate::topology::config::SinkContext;
use chrono::offset::TimeZone;
use futures::{stream, Sink};
use futures01::{stream, Sink};

fn config() -> CloudWatchMetricsSinkConfig {
CloudWatchMetricsSinkConfig {
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/aws_kinesis_firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
};
use bytes::Bytes;
use futures::{stream::iter_ok, Future, Poll, Sink};
use futures01::{stream::iter_ok, Future, Poll, Sink};
use lazy_static::lazy_static;
use rusoto_core::{Region, RusotoError, RusotoFuture};
use rusoto_firehose::{
Expand Down Expand Up @@ -263,7 +263,7 @@ mod integration_tests {
test_util::{random_events_with_stream, random_string},
topology::config::SinkContext,
};
use futures::Sink;
use futures01::Sink;
use rusoto_core::Region;
use rusoto_firehose::{
CreateDeliveryStreamInput, ElasticsearchDestinationConfiguration, KinesisFirehose,
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/aws_kinesis_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
};
use bytes::Bytes;
use futures::{stream::iter_ok, Future, Poll, Sink};
use futures01::{stream::iter_ok, Future, Poll, Sink};
use lazy_static::lazy_static;
use rand::random;
use rusoto_core::{Region, RusotoError, RusotoFuture};
Expand Down Expand Up @@ -331,7 +331,7 @@ mod integration_tests {
test_util::{random_lines_with_stream, random_string},
topology::config::SinkContext,
};
use futures::{Future, Sink};
use futures01::{Future, Sink};
use rusoto_core::Region;
use rusoto_kinesis::{Kinesis, KinesisClient};
use std::sync::Arc;
Expand Down
8 changes: 4 additions & 4 deletions src/sinks/aws_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
};
use bytes::Bytes;
use chrono::Utc;
use futures::{stream::iter_ok, Future, Poll, Sink};
use futures01::{stream::iter_ok, Future, Poll, Sink};
use lazy_static::lazy_static;
use rusoto_core::{Region, RusotoError, RusotoFuture};
use rusoto_s3::{
Expand Down Expand Up @@ -512,7 +512,7 @@ mod integration_tests {
topology::config::SinkContext,
};
use flate2::read::GzDecoder;
use futures::{Future, Sink};
use futures01::{Future, Sink};
use pretty_assertions::assert_eq;
use rusoto_core::region::Region;
use rusoto_s3::{S3Client, S3};
Expand Down Expand Up @@ -578,7 +578,7 @@ mod integration_tests {
e
});

let pump = sink.send_all(futures::stream::iter_ok(events));
let pump = sink.send_all(futures01::stream::iter_ok(events));
let _ = rt.block_on(pump).unwrap();

let keys = get_keys(prefix.unwrap());
Expand Down Expand Up @@ -613,7 +613,7 @@ mod integration_tests {

let (lines, _) = random_lines_with_stream(100, 30);

let (tx, rx) = futures::sync::mpsc::channel(1);
let (tx, rx) = futures01::sync::mpsc::channel(1);
let pump = sink.send_all(rx).map(|_| ()).map_err(|_| ());

let mut rt = Runtime::new().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/blackhole.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
event::{self, Event},
topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
};
use futures::{future, AsyncSink, Future, Poll, Sink, StartSend};
use futures01::{future, AsyncSink, Future, Poll, Sink, StartSend};
use serde::{Deserialize, Serialize};

pub struct BlackholeSink {
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
tls::{TlsOptions, TlsSettings},
topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
};
use futures::{stream::iter_ok, Future, Sink};
use futures01::{stream::iter_ok, Future, Sink};
use http::StatusCode;
use http::{Method, Uri};
use hyper::{Body, Request};
Expand Down Expand Up @@ -260,7 +260,7 @@ mod integration_tests {
test_util::{random_string, runtime},
topology::config::{SinkConfig, SinkContext},
};
use futures::Sink;
use futures01::Sink;
use serde_json::Value;
use std::time::Duration;
use tokio::util::FutureExt;
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
event::{self, Event},
topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
};
use futures::{future, Sink};
use futures01::{future, Sink};
use serde::{Deserialize, Serialize};
use tokio::{
codec::{FramedWrite, LinesCodec},
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/datadog_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
};
use chrono::{DateTime, Utc};
use futures::{Future, Poll};
use futures01::{Future, Poll};
use http::{uri::InvalidUri, Method, StatusCode, Uri};
use hyper;
use hyper_tls::HttpsConnector;
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
tls::{TlsOptions, TlsSettings},
topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
};
use futures::{stream::iter_ok, Future, Sink};
use futures01::{stream::iter_ok, Future, Sink};
use http::{uri::InvalidUri, Method, Uri};
use hyper::{
header::{HeaderName, HeaderValue},
Expand Down Expand Up @@ -423,7 +423,7 @@ mod integration_tests {
topology::config::{SinkConfig, SinkContext},
Event,
};
use futures::{Future, Sink};
use futures01::{Future, Sink};
use hyper::{Body, Request};
use serde_json::{json, Value};
use std::fs::File;
Expand Down
Loading

0 comments on commit 0b92159

Please sign in to comment.