Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: xufei <[email protected]>
  • Loading branch information
windtalker committed Aug 25, 2022
1 parent 3bbdfa4 commit 27dfce2
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 4 deletions.
2 changes: 0 additions & 2 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,11 @@ ::grpc::Status FlashService::establishMPPConnectionSyncOrAsync(::grpc::ServerCon
{
// In async mode, this function won't wait for the request done and the finish event is handled in EstablishCallData.
tunnel->connect(calldata);
LOG_FMT_DEBUG(tunnel->getLogger(), "connect tunnel successfully in async way");
}
else
{
SyncPacketWriter writer(sync_writer);
tunnel->connect(&writer);
LOG_FMT_DEBUG(tunnel->getLogger(), "connect tunnel successfully and begin to wait");
tunnel->waitForFinish();
LOG_FMT_INFO(tunnel->getLogger(), "connection for {} cost {} ms.", tunnel->id(), stopwatch.elapsedMilliseconds());
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ bool pushPacket(size_t source_index,
fiu_do_on(FailPoints::random_receiver_async_msg_push_failure_failpoint, push_succeed = false;);
}
}
LOG_FMT_DEBUG(log, "push recv_msg to msg_channels(size: {}) succeed:{}, enable_fine_grained_shuffle: {}", msg_channels.size(), push_succeed, enable_fine_grained_shuffle);
LOG_FMT_TRACE(log, "push recv_msg to msg_channels(size: {}) succeed:{}, enable_fine_grained_shuffle: {}", msg_channels.size(), push_succeed, enable_fine_grained_shuffle);
return push_succeed;
}

Expand Down
20 changes: 19 additions & 1 deletion dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,24 @@ extern const char exception_during_mpp_close_tunnel[];
extern const char random_tunnel_wait_timeout_failpoint[];
} // namespace FailPoints

namespace
{
String tunnelSenderModeToString(TunnelSenderMode mode)
{
switch (mode)
{
case TunnelSenderMode::ASYNC_GRPC:
return "async";
case TunnelSenderMode::SYNC_GRPC:
return "sync";
case TunnelSenderMode::LOCAL:
return "local";
default:
return "unknown";
}
}
} // namespace

MPPTunnel::MPPTunnel(
const mpp::TaskMeta & receiver_meta_,
const mpp::TaskMeta & sender_meta_,
Expand Down Expand Up @@ -207,7 +225,7 @@ void MPPTunnel::connect(PacketWriter * writer)
status = TunnelStatus::Connected;
cv_for_status_changed.notify_all();
}
LOG_DEBUG(log, "connected");
LOG_FMT_DEBUG(log, "Tunnel connected in {} mode", tunnelSenderModeToString(mode));
}

void MPPTunnel::waitForFinish()
Expand Down

0 comments on commit 27dfce2

Please sign in to comment.