Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix data race occurred in SharedQueryBlockInputStream. #5309

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 128 additions & 109 deletions dbms/src/DataStreams/IProfilingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Interpreters/Quota.h>
#include <Interpreters/ProcessList.h>
#include <DataStreams/IProfilingBlockInputStream.h>

#include <Interpreters/ProcessList.h>
#include <Interpreters/Quota.h>

namespace DB
{

namespace ErrorCodes
{
extern const int TOO_MANY_ROWS;
extern const int TOO_MANY_BYTES;
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int TIMEOUT_EXCEEDED;
extern const int TOO_SLOW;
extern const int LOGICAL_ERROR;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
}
extern const int TOO_MANY_ROWS;
extern const int TOO_MANY_BYTES;
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int TIMEOUT_EXCEEDED;
extern const int TOO_SLOW;
extern const int LOGICAL_ERROR;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
} // namespace ErrorCodes


IProfilingBlockInputStream::IProfilingBlockInputStream()
Expand All @@ -51,21 +50,25 @@ Block IProfilingBlockInputStream::read(FilterPtr & res_filter, bool return_filte
total_rows_approx = 0;
}

if (!info.started)
{
info.total_stopwatch.start();
info.started = true;
}

UInt64 start_time;
Block res;
{
auto lock = get_lock();
if (!info.started)
{
info.total_stopwatch.start();
info.started = true;
}

if (isCancelledOrThrowIfKilled())
return res;

auto start_time = info.total_stopwatch.elapsed();
if (isCancelledOrThrowIfKilled())
{
return res;
}
start_time = info.total_stopwatch.elapsed();

if (!checkTimeLimit())
limit_exceeded_need_break = true;
if (!checkTimeLimit())
limit_exceeded_need_break = true;
}

if (!limit_exceeded_need_break)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limit_exceeded_need_break is still not in lock protection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have protected it, but IMO the code is hard to read.

Copy link
Contributor

@gengliqi gengliqi Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm...I see that limit_exceeded_need_break is not an atomic variable and is read without a lock. You can see https://stackoverflow.com/questions/14624776/can-a-bool-read-write-operation-be-not-atomic-on-x86 for more information.

{
Expand All @@ -77,16 +80,19 @@ Block IProfilingBlockInputStream::read(FilterPtr & res_filter, bool return_filte

if (res)
{
info.update(res);
{
auto lock = get_lock();
info.update(res);

if (enabled_extremes)
updateExtremes(res);
if (enabled_extremes)
updateExtremes(res);

if (limits.mode == LIMITS_CURRENT && !limits.size_limits.check(info.rows, info.bytes, "result", ErrorCodes::TOO_MANY_ROWS_OR_BYTES))
limit_exceeded_need_break = true;
if (limits.mode == LIMITS_CURRENT && !limits.size_limits.check(info.rows, info.bytes, "result", ErrorCodes::TOO_MANY_ROWS_OR_BYTES))
limit_exceeded_need_break = true;

if (quota != nullptr)
checkQuota(res);
if (quota != nullptr)
checkQuota(res);
}
}
else
{
Expand All @@ -110,36 +116,52 @@ Block IProfilingBlockInputStream::read(FilterPtr & res_filter, bool return_filte
}
#endif

info.updateExecutionTime(info.total_stopwatch.elapsed() - start_time);
{
auto lock = get_lock();
info.updateExecutionTime(info.total_stopwatch.elapsed() - start_time);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In progressImpl function, info still may be used without lock protection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

return res;
}


void IProfilingBlockInputStream::readPrefix()
{
auto start_time = info.total_stopwatch.elapsed();
UInt64 start_time;
{
auto lock = get_lock();
start_time = info.total_stopwatch.elapsed();
}
readPrefixImpl();

forEachChild([&] (IBlockInputStream & child)
{
forEachChild([&](IBlockInputStream & child) {
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
child.readPrefix();
return false;
});
info.updateExecutionTime(info.total_stopwatch.elapsed() - start_time);
{
auto lock = get_lock();
info.updateExecutionTime(info.total_stopwatch.elapsed() - start_time);
}
}


void IProfilingBlockInputStream::readSuffix()
{
auto start_time = info.total_stopwatch.elapsed();
forEachChild([&] (IBlockInputStream & child)
UInt64 start_time;
{
auto lock = get_lock();
start_time = info.total_stopwatch.elapsed();
}

forEachChild([&](IBlockInputStream & child) {
child.readSuffix();
return false;
});

readSuffixImpl();
info.updateExecutionTime(info.total_stopwatch.elapsed() - start_time);
{
auto lock = get_lock();
info.updateExecutionTime(info.total_stopwatch.elapsed() - start_time);
}
}


Expand Down Expand Up @@ -213,24 +235,23 @@ static bool handleOverflowMode(OverflowMode mode, const String & message, int co
{
switch (mode)
{
case OverflowMode::THROW:
throw Exception(message, code);
case OverflowMode::BREAK:
return false;
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
case OverflowMode::THROW:
throw Exception(message, code);
case OverflowMode::BREAK:
return false;
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
};


bool IProfilingBlockInputStream::checkTimeLimit()
bool IProfilingBlockInputStream::checkTimeLimit() const
{
if (limits.max_execution_time != 0
&& info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(limits.timeout_overflow_mode,
"Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds())
+ " seconds, maximum: " + toString(limits.max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);
"Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds())
+ " seconds, maximum: " + toString(limits.max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);

return true;
}
Expand All @@ -240,24 +261,28 @@ void IProfilingBlockInputStream::checkQuota(Block & block)
{
switch (limits.mode)
{
case LIMITS_TOTAL:
/// Checked in `progress` method.
break;
case LIMITS_TOTAL:
/// Checked in `progress` method.
break;

case LIMITS_CURRENT:
case LIMITS_CURRENT:
{
time_t current_time = time(nullptr);
double total_elapsed;
{
time_t current_time = time(nullptr);
double total_elapsed = info.total_stopwatch.elapsedSeconds();
auto lock = get_lock();
total_elapsed = info.total_stopwatch.elapsedSeconds();
}

quota->checkAndAddResultRowsBytes(current_time, block.rows(), block.bytes());
quota->checkAndAddExecutionTime(current_time, Poco::Timespan((total_elapsed - prev_elapsed) * 1000000.0));
quota->checkAndAddResultRowsBytes(current_time, block.rows(), block.bytes());
quota->checkAndAddExecutionTime(current_time, Poco::Timespan((total_elapsed - prev_elapsed) * 1000000.0));

prev_elapsed = total_elapsed;
break;
}
prev_elapsed = total_elapsed;
break;
}

default:
throw Exception("Logical error: unknown limits mode.", ErrorCodes::LOGICAL_ERROR);
default:
throw Exception("Logical error: unknown limits mode.", ErrorCodes::LOGICAL_ERROR);
}
}

Expand Down Expand Up @@ -287,48 +312,51 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
{
switch (limits.size_limits.overflow_mode)
{
case OverflowMode::THROW:
{
if (limits.size_limits.max_rows && total_rows_estimate > limits.size_limits.max_rows)
throw Exception("Limit for rows to read exceeded: " + toString(total_rows_estimate)
+ " rows read (or to read), maximum: " + toString(limits.size_limits.max_rows),
ErrorCodes::TOO_MANY_ROWS);
else
throw Exception("Limit for (uncompressed) bytes to read exceeded: " + toString(progress.bytes)
+ " bytes read, maximum: " + toString(limits.size_limits.max_bytes),
ErrorCodes::TOO_MANY_BYTES);
break;
}
case OverflowMode::THROW:
{
if (limits.size_limits.max_rows && total_rows_estimate > limits.size_limits.max_rows)
throw Exception("Limit for rows to read exceeded: " + toString(total_rows_estimate)
+ " rows read (or to read), maximum: " + toString(limits.size_limits.max_rows),
ErrorCodes::TOO_MANY_ROWS);
else
throw Exception("Limit for (uncompressed) bytes to read exceeded: " + toString(progress.bytes)
+ " bytes read, maximum: " + toString(limits.size_limits.max_bytes),
ErrorCodes::TOO_MANY_BYTES);
break;
}

case OverflowMode::BREAK:
case OverflowMode::BREAK:
{
/// For `break`, we will stop only if so many rows were actually read, and not just supposed to be read.
if ((limits.size_limits.max_rows && progress.rows > limits.size_limits.max_rows)
|| (limits.size_limits.max_bytes && progress.bytes > limits.size_limits.max_bytes))
{
/// For `break`, we will stop only if so many rows were actually read, and not just supposed to be read.
if ((limits.size_limits.max_rows && progress.rows > limits.size_limits.max_rows)
|| (limits.size_limits.max_bytes && progress.bytes > limits.size_limits.max_bytes))
{
cancel(false);
}

break;
cancel(false);
}

default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
break;
}

default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}

size_t total_rows = progress.total_rows;

if (limits.min_execution_speed || (total_rows && limits.timeout_before_checking_execution_speed != 0))
{
double total_elapsed = info.total_stopwatch.elapsedSeconds();

double total_elapsed;
{
auto lock = get_lock();
total_elapsed = info.total_stopwatch.elapsedSeconds();
}
if (total_elapsed > limits.timeout_before_checking_execution_speed.totalMicroseconds() / 1000000.0)
{
if (limits.min_execution_speed && progress.rows / total_elapsed < limits.min_execution_speed)
throw Exception("Query is executing too slow: " + toString(progress.rows / total_elapsed)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);

size_t total_rows = progress.total_rows;

Expand All @@ -339,9 +367,9 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)

if (estimated_execution_time_seconds > limits.max_execution_time.totalSeconds())
throw Exception("Estimated query execution time (" + toString(estimated_execution_time_seconds) + " seconds)"
+ " is too long. Maximum: " + toString(limits.max_execution_time.totalSeconds())
+ ". Estimated rows to process: " + toString(total_rows),
ErrorCodes::TOO_SLOW);
+ " is too long. Maximum: " + toString(limits.max_execution_time.totalSeconds())
+ ". Estimated rows to process: " + toString(total_rows),
ErrorCodes::TOO_SLOW);
}
}
}
Expand All @@ -363,8 +391,7 @@ void IProfilingBlockInputStream::cancel(bool kill)
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;

forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
forEachProfilingChild([&](IProfilingBlockInputStream & child) {
child.cancel(kill);
return false;
});
Expand All @@ -390,8 +417,7 @@ void IProfilingBlockInputStream::setProgressCallback(const ProgressCallback & ca
{
progress_callback = callback;

forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
forEachProfilingChild([&](IProfilingBlockInputStream & child) {
child.setProgressCallback(callback);
return false;
});
Expand All @@ -402,8 +428,7 @@ void IProfilingBlockInputStream::setProcessListElement(ProcessListElement * elem
{
process_list_elem = elem;

forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
forEachProfilingChild([&](IProfilingBlockInputStream & child) {
child.setProcessListElement(elem);
return false;
});
Expand All @@ -416,12 +441,9 @@ Block IProfilingBlockInputStream::getTotals()
return totals;

Block res;
forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
forEachProfilingChild([&](IProfilingBlockInputStream & child) {
res = child.getTotals();
if (res)
return true;
return false;
return static_cast<bool>(res);
});
return res;
}
Expand All @@ -432,14 +454,11 @@ Block IProfilingBlockInputStream::getExtremes()
return extremes;

Block res;
forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
forEachProfilingChild([&](IProfilingBlockInputStream & child) {
res = child.getExtremes();
if (res)
return true;
return false;
return static_cast<bool>(res);
});
return res;
}

}
} // namespace DB
Loading