Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: replace *join_all with forloop in flush #947

Merged
merged 5 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use common_util::{
};
use futures::{
channel::{mpsc, mpsc::channel},
future::try_join_all,
stream, SinkExt, TryStreamExt,
};
use log::{debug, error, info};
Expand Down Expand Up @@ -542,9 +541,9 @@ impl FlushTask {
}
batch_record_senders.clear();

let info_and_metas = try_join_all(sst_handlers).await.context(RuntimeJoin)?;
for (idx, info_and_meta) in info_and_metas.into_iter().enumerate() {
let (sst_info, sst_meta) = info_and_meta?;
for (idx, sst_handler) in sst_handlers.into_iter().enumerate() {
let info_and_metas = sst_handler.await.context(RuntimeJoin)?;
let (sst_info, sst_meta) = info_and_metas?;
files_to_level0.push(AddFile {
level: Level::MIN,
file: FileMeta {
Expand Down
13 changes: 6 additions & 7 deletions analytic_engine/src/row_iter/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use common_types::{
SequenceNumber,
};
use common_util::{define_result, error::GenericError};
use futures::{future::try_join_all, StreamExt};
use futures::{stream::FuturesUnordered, StreamExt};
use log::{debug, trace};
use snafu::{ensure, Backtrace, ResultExt, Snafu};
use table_engine::{predicate::PredicateRef, table::TableId};
Expand Down Expand Up @@ -675,21 +675,20 @@ impl MergeIterator {
let init_start = Instant::now();

// Initialize buffered streams concurrently.
let mut init_buffered_streams = Vec::with_capacity(self.origin_streams.len());
let mut init_buffered_streams = FuturesUnordered::new();
for origin_stream in mem::take(&mut self.origin_streams) {
let schema = self.schema.clone();
init_buffered_streams
.push(async move { BufferedStream::build(schema, origin_stream).await });
init_buffered_streams.push(BufferedStream::build(schema, origin_stream));
}

let pull_start = Instant::now();
let buffered_streams = try_join_all(init_buffered_streams).await?;
self.metrics.scan_duration += pull_start.elapsed();
self.metrics.scan_count += buffered_streams.len();
self.metrics.scan_count += init_buffered_streams.len();

// Push streams to heap.
let current_schema = &self.schema;
for buffered_stream in buffered_streams {
while let Some(buffered_stream) = init_buffered_streams.next().await {
let buffered_stream = buffered_stream?;
let stream_schema = buffered_stream.schema();
ensure!(
current_schema == stream_schema,
Expand Down
31 changes: 16 additions & 15 deletions partition_table_engine/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use common_types::{
schema::Schema,
};
use common_util::error::BoxError;
use futures::future::try_join_all;
use futures::{stream::FuturesUnordered, StreamExt};
use snafu::ResultExt;
use table_engine::{
partition::{
Expand Down Expand Up @@ -242,27 +242,28 @@ impl Table for PartitionTableImpl {
};

// Query streams through remote engine.
let mut futures = Vec::with_capacity(partitions.len());
let mut futures = FuturesUnordered::new();
for partition in partitions {
let remote_engine = self.remote_engine.clone();
let request_clone = request.clone();
futures.push(async move {
remote_engine
.read(RemoteReadRequest {
table: self.get_sub_table_ident(partition),
read_request: request_clone,
})
.await
})
let read_partition = self.remote_engine.read(RemoteReadRequest {
table: self.get_sub_table_ident(partition),
read_request: request.clone(),
});
futures.push(read_partition);
}

let mut record_batch_streams = Vec::with_capacity(futures.len());
while let Some(record_batch_stream) = futures.next().await {
let record_batch_stream = record_batch_stream
.box_err()
.context(Scan { table: self.name() })?;
record_batch_streams.push(record_batch_stream);
}

let streams = {
let _remote_timer = PARTITION_TABLE_PARTITIONED_READ_DURATION_HISTOGRAM
.with_label_values(&["remote_read"])
.start_timer();
try_join_all(futures).await.box_err().context(Scan {
table: self.name().to_string(),
})?
record_batch_streams
};

Ok(PartitionedStreams { streams })
Expand Down
16 changes: 5 additions & 11 deletions proxy/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ use common_types::{
time::Timestamp,
};
use common_util::error::BoxError;
use futures::{
future::{try_join_all, BoxFuture},
FutureExt,
};
use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt};
use http::StatusCode;
use interpreters::interpreter::Output;
use log::{debug, error, info};
Expand Down Expand Up @@ -394,16 +391,13 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
&self,
futures: Vec<BoxFuture<'_, common_util::runtime::Result<Result<WriteResponse>>>>,
) -> Result<WriteResponse> {
let resps = try_join_all(futures)
.await
.box_err()
.context(ErrWithCause {
let mut futures: FuturesUnordered<_> = futures.into_iter().collect();
let mut success = 0;
while let Some(resp) = futures.next().await {
let resp = resp.box_err().context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to join task",
})?;

let mut success = 0;
for resp in resps {
success += resp?.success;
}

Expand Down
8 changes: 4 additions & 4 deletions remote_engine_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use common_types::{
projected_schema::ProjectedSchema, record_batch::RecordBatch, schema::RecordSchema,
};
use common_util::{error::BoxError, runtime::Runtime};
use futures::{future::join_all, Stream, StreamExt};
use futures::{Stream, StreamExt};
use log::info;
use router::RouterRef;
use snafu::{ensure, OptionExt, ResultExt};
Expand Down Expand Up @@ -188,9 +188,9 @@ impl Client {
written_tables.push(table_idents);
}

let remote_write_results = join_all(remote_writes).await;
let mut results = Vec::with_capacity(remote_write_results.len());
for (table_idents, batch_result) in written_tables.into_iter().zip(remote_write_results) {
let mut results = Vec::with_capacity(remote_writes.len());
for (table_idents, remote_write) in written_tables.into_iter().zip(remote_writes) {
let batch_result = remote_write.await;
// If it's runtime error, don't evict entires from route cache.
let batch_result = match batch_result.box_err() {
Ok(result) => result,
Expand Down
10 changes: 4 additions & 6 deletions wal/src/tests/read_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,12 +404,10 @@ async fn write_multiple_regions_parallelly<B: WalBuilder + 'static>(env: Arc<Tes
handles.push(read_write_0);
handles.push(read_write_1);
}
futures::future::join_all(handles)
.await
.into_iter()
.for_each(|res| {
res.expect("should succeed to join the write");
});

for handle in handles {
handle.await.expect("should succeed to join the write")
}

wal.close_gracefully().await.unwrap();
}
Expand Down