-
Notifications
You must be signed in to change notification settings - Fork 411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refact MPPTunnel class to encapsulate different tunnel mode #5286
Conversation
…to one enum 2.Comment rewrite Signed-off-by: yibin <[email protected]>
Signed-off-by: yibin <[email protected]>
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
/run-unit-tests |
Coverage for changed files
Coverage summary
full coverage report (for internal network access only) |
Signed-off-by: yibin <[email protected]>
Signed-off-by: yibin <[email protected]>
Signed-off-by: yibin <[email protected]>
dbms/src/Flash/EstablishCall.cpp
Outdated
@@ -117,7 +117,7 @@ void EstablishCallData::writeDone(const ::grpc::Status & status) | |||
state = FINISH; | |||
if (stopwatch) | |||
{ | |||
LOG_FMT_INFO(mpp_tunnel->getLogger(), "connection for {} cost {} ms.", mpp_tunnel->id(), stopwatch->elapsedMilliseconds()); | |||
LOG_FMT_INFO(async_tunnel_sender->getLogger(), "connection cost {} ms.", stopwatch->elapsedMilliseconds()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not log the tunnel id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In previous version, tunnel_sender doesn't have tunnel_id field, and the Logger itself already has tunnel_id info as prefix, so just remove the tunnel_id.
Since now add tunnel_id field in tunnel_sender, will add it back.
dbms/src/Flash/Mpp/MPPTunnel.cpp
Outdated
async_tunnel_sender = std::make_shared<AsyncTunnelSender>(mode, send_queue, writer, log, tunnel_id); | ||
tunnel_sender = async_tunnel_sender; | ||
RUNTIME_ASSERT(writer != nullptr, log, "Async writer shouldn't be null"); | ||
RUNTIME_ASSERT(tunnel_sender != nullptr, log, "Tunnel sender shouldn't be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this assert, line 210 already make sure that tunnel_sender
is not nullptr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I'll remove the check.
send_queue.push(std::make_shared<mpp::MPPDataPacket>(getPacketWithError(reason))); | ||
if (!is_local && is_async) | ||
writer->tryFlushOne(); | ||
send_queue->push(std::make_shared<mpp::MPPDataPacket>(getPacketWithError(reason))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When in WaitingForSenderFinish
state, I think the send_queue
is already finished
or cancelled
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems in WaitingForSenderFinish state, MPPTunnel should just ignore the close event.
if (!err_msg.empty()) | ||
{ | ||
err_msg = fmt::format("{} meet error: {}", tunnel_id, err_msg); | ||
LOG_ERROR(log, err_msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to trim the stack trace information in err_msg
after it is logged. Otherwise the error message will be too long and hard to read. Please refer to #5304 for the details
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, nice suggestions!
if (!err_msg.empty()) | ||
{ | ||
err_msg = fmt::format("{} meet error: {}", tunnel_id, err_msg); | ||
LOG_ERROR(log, err_msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
dbms/src/Flash/Mpp/MPPTunnel.h
Outdated
* - Consumer may close `send_queue` to notify MPPTunnel that an error occurs. | ||
* - After `connect` only the consumer can set `finished` to `true`. | ||
* - Consumer's state is saved in `consumer_state` and be available after consumer finished. | ||
* - MPPTunnel may close `send_queue` to notify Sender normally finish. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two ways to close
send_queue: finish
and cancel
, is there any difference between the two close
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change 'close' to 'finish'.
Signed-off-by: yibin <[email protected]>
dbms/src/Common/MPMCQueue.h
Outdated
@@ -76,7 +76,8 @@ class MPMCQueue | |||
|
|||
/// Block until: | |||
/// 1. Pop succeeds with a valid T: return true. | |||
/// 2. The queue is cancelled or finished: return false. | |||
/// 2. The queue is cancelled: return false. | |||
/// 3. The queue is finished: return true if the queue is not empty. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first case already contains "queue is finished but not empty".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about make rule1 like: "The queue is normal or finished, Pop succeeds with a valid T: return true" to make it more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's kind of reverse to me in that I (the user) needn't to know the queue state until the pop
failed. That means a normal queue and a finished queue seem no difference to me if they are both non-empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a big issue though. It's ok if you think the new desc is more friendly to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, previous comment is a little confusing to me, make me think pop finished queue will always return false because cancel/finish has no differences in the previous comments.
To make it simple and clear, I'll add a table to describe the behavior for both pop/push.
dbms/src/Flash/EstablishCall.h
Outdated
@@ -115,7 +116,7 @@ class EstablishCallData : public PacketWriter | |||
FINISH | |||
}; | |||
CallStatus state; // The current serving state. | |||
std::shared_ptr<DB::MPPTunnel> mpp_tunnel = nullptr; | |||
std::shared_ptr<DB::AsyncTunnelSender> async_tunnel_sender = nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needn't to give it a default value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I'll remove it.
Unconnected, // Not connect to any writer, not able to accept new data | ||
Connected, // Connected to some writer, accepting data | ||
WaitingForSenderFinish, // Accepting all data already, wait for sender to finish | ||
Finished // Final state, no more work to do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about differentiate Finished
(finish after connect) and Cancelled
(close before connect)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, they are different states, I think they are just not so different, since cancel event may lead to Finished state too:)
In the other side, it might be more meaningful to differentiate cancel and finished, I'll consider it later.
Signed-off-by: yibin <[email protected]>
dbms/src/Flash/Mpp/MPPTunnel.h
Outdated
{ | ||
} | ||
|
||
// before finished, must be called without protection of mu |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this comment is out of date, there is no mu
in TunnelSender
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'll remove it.
} | ||
void setMsg(const String & msg) | ||
{ | ||
promise.set_value(msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not releated to this pr, but do we need some protections to make sure promise.set_value
is not called multiple times? Since here says "An exception is thrown if there is no shared state or the shared state already stores a value or exception."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the suggested way to set consumer_state's msg is to call consumerFinish. In consumerFinish, there is the protection, check if msg already set, if so, just do nothing and return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we enhance setMsg
as
void setMsg(const String & msg)
{
bool old_value = false;
if (!msg_has_set.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
promise.set_value(msg);
}
And consumerFinish
don't need to involve lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tricky part of this is that, it's an obvious bug if promise.set_value
is called more than once. Do we need to protect for potential bug and pretend nothing happened, or just leave it throw then we could quickly know there's a bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, Good point from Fu, I would choose to protect this, since we used to allow multiple invocations of consumeFinish to ensure the MPPTunnel not blocked, and I don't see any significant bad effects here.
dbms/src/Flash/Mpp/MPPTunnel.h
Outdated
/// close() finishes the tunnel, if the tunnel is connected already, it will | ||
/// write the error message to the tunnel, otherwise it just close the tunnel | ||
void close(const String & reason); | ||
|
||
// a MPPConn request has arrived. it will build connection by this tunnel; | ||
void connect(Writer * writer_); | ||
virtual void connect(PacketWriter * writer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why need to be a virtual function, seems I don't found override version of it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, it is a mistake, I'll fix it.
Signed-off-by: yibin <[email protected]>
Signed-off-by: yibin <[email protected]>
Signed-off-by: yibin <[email protected]>
Signed-off-by: yibin <[email protected]>
Signed-off-by: yibin <[email protected]>
Signed-off-by: yibin <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Others LGTM
Signed-off-by: yibin <[email protected]>
/run-unit-tests |
Signed-off-by: yibin <[email protected]>
/merge |
@yibin87: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This pull request has been accepted and is ready to merge. Commit hash: 02a26f2
|
@yibin87: Your PR was out of date, I have automatically updated it for you. At the same time I will also trigger all tests for you: /run-all-tests If the CI test fails, you just re-trigger the test that failed and the bot will merge the PR for you after the CI passes. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
Coverage for changed files
Coverage summary
full coverage report (for internal network access only) |
/run-all-tests |
Coverage for changed files
Coverage summary
full coverage report (for internal network access only) |
What problem does this PR solve?
Issue Number: close #5095
Problem Summary:
What is changed and how it works?
Check List
Tests
Side effects
Documentation
Release note