Skip to content

Commit

Permalink
Merge branch 'master' into support_pipeline_table_scan_fullstack
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy authored Apr 6, 2023
2 parents 7c87eaa + 5e646a9 commit de73ae5
Show file tree
Hide file tree
Showing 57 changed files with 934 additions and 311 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,40 @@

#pragma once

#include <Flash/Executor/QueryExecutor.h>

#include <mutex>

namespace DB
{
class QueryExecutorHolder
template <typename Ptr>
class PtrHolder
{
public:
void set(QueryExecutorPtr && query_executor_)
void set(Ptr && obj_)
{
assert(obj_);
std::lock_guard lock(mu);
assert(!query_executor);
query_executor = std::move(query_executor_);
assert(!obj);
obj = std::move(obj_);
}

std::optional<QueryExecutor *> tryGet()
auto tryGet()
{
std::optional<QueryExecutor *> res;
std::optional<decltype(obj.get())> res;
std::lock_guard lock(mu);
if (query_executor != nullptr)
res.emplace(query_executor.get());
if (obj != nullptr)
res.emplace(obj.get());
return res;
}

QueryExecutor * operator->()
auto operator->()
{
std::lock_guard lock(mu);
assert(query_executor != nullptr);
return query_executor.get();
assert(obj != nullptr);
return obj.get();
}

private:
std::mutex mu;
QueryExecutorPtr query_executor;
Ptr obj;
};
} // namespace DB
79 changes: 79 additions & 0 deletions dbms/src/Common/UniThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/Exception.h>
#include <Poco/Event.h>
#include <boost_wrapper/priority_queue.h>

Expand All @@ -23,14 +24,18 @@
#include <cstdint>
#include <ext/scope_guard.h>
#include <functional>
#include <future>
#include <list>
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <thread>

namespace DB
{
template <typename Thread>
class ThreadPoolWaitGroup;
/** Very simple thread pool similar to boost::threadpool.
* Advantages:
* - catches exceptions and rethrows on wait.
Expand Down Expand Up @@ -93,6 +98,11 @@ class ThreadPoolImpl
void setQueueSize(size_t value);
size_t getMaxThreads() const;

std::unique_ptr<ThreadPoolWaitGroup<Thread>> waitGroup()
{
return std::make_unique<ThreadPoolWaitGroup<Thread>>(*this);
}

private:
mutable std::mutex mutex;
std::condition_variable job_finished;
Expand Down Expand Up @@ -284,6 +294,75 @@ class ThreadFromGlobalPoolImpl : boost::noncopyable
}
};

/// ThreadPoolWaitGroup is used to wait all the task launched here to finish
/// To guarantee the exception safty of ThreadPoolWaitGroup, we need to create object, do schedule and wait in the same scope.
template <typename Thread>
class ThreadPoolWaitGroup
{
public:
explicit ThreadPoolWaitGroup(ThreadPoolImpl<Thread> & thread_pool_)
: thread_pool(thread_pool_)
{}
ThreadPoolWaitGroup(const ThreadPoolWaitGroup &) = delete;
~ThreadPoolWaitGroup()
{
try
{
wait();
}
catch (...)
{
tryLogCurrentException(Logger::get(), "Error in destructor function of ThreadPoolWaitGroup");
}
}

void schedule(std::function<void()> func)
{
auto task = std::make_shared<std::packaged_task<void()>>(func);
thread_pool.scheduleOrThrowOnError([task] { (*task)(); });
futures.emplace_back(task->get_future());
}

void wait()
{
if (consumed)
return;
consumed = true;

std::exception_ptr first_exception;
for (auto & future : futures)
{
// ensure all futures finished
try
{
future.get();
}
catch (...)
{
if (!first_exception)
first_exception = std::current_exception();
}
}

if (first_exception)
{
try
{
std::rethrow_exception(first_exception);
}
catch (Exception & exc)
{
exc.addMessage(exc.getStackTrace().toString());
exc.rethrow();
}
}
}

private:
std::vector<std::future<void>> futures;
ThreadPoolImpl<Thread> & thread_pool;
bool consumed = false;
};
/// Schedule jobs/tasks on global thread pool without implicit passing tracing context on current thread to underlying worker as parent tracing context.
///
/// If you implement your own job/task scheduling upon global thread pool or schedules a long time running job in a infinite loop way,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Databases/DatabaseMemory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ DatabaseMemory::DatabaseMemory(String name_)

void DatabaseMemory::loadTables(
Context & /*context*/,
legacy::ThreadPool * /*thread_pool*/,
ThreadPool * /*thread_pool*/,
bool /*has_force_restore_data_flag*/)
{
/// Nothing to load.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Databases/DatabaseMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DatabaseMemory : public DatabaseWithOwnTablesBase

void loadTables(
Context & context,
legacy::ThreadPool * thread_pool,
ThreadPool * thread_pool,
bool has_force_restore_data_flag) override;

void createTable(
Expand Down
51 changes: 40 additions & 11 deletions dbms/src/Databases/DatabaseOrdinary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/FailPoint.h>
#include <Common/Stopwatch.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/UniThreadPool.h>
#include <Common/escapeForFileName.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseOrdinary.h>
Expand All @@ -29,11 +30,9 @@
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Poco/DirectoryIterator.h>
#include <common/ThreadPool.h>
#include <common/logger_useful.h>
#include <fmt/core.h>


namespace DB
{
namespace ErrorCodes
Expand All @@ -46,6 +45,7 @@ extern const int FILE_DOESNT_EXIST;
extern const int LOGICAL_ERROR;
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
extern const int SYNTAX_ERROR;
extern const int TIDB_TABLE_ALREADY_EXISTS;
} // namespace ErrorCodes

namespace FailPoints
Expand Down Expand Up @@ -83,7 +83,7 @@ DatabaseOrdinary::DatabaseOrdinary(String name_, const String & metadata_path_,
}


void DatabaseOrdinary::loadTables(Context & context, legacy::ThreadPool * thread_pool, bool has_force_restore_data_flag)
void DatabaseOrdinary::loadTables(Context & context, ThreadPool * thread_pool, bool has_force_restore_data_flag)
{
using FileNames = std::vector<std::string>;
FileNames file_names = DatabaseLoading::listSQLFilenames(metadata_path, log);
Expand All @@ -102,10 +102,15 @@ void DatabaseOrdinary::loadTables(Context & context, legacy::ThreadPool * thread
AtomicStopwatch watch;
std::atomic<size_t> tables_processed{0};

auto wait_group = thread_pool ? thread_pool->waitGroup() : nullptr;

std::mutex failed_tables_mutex;
Tables tables_failed_to_startup;

auto task_function = [&](FileNames::const_iterator begin, FileNames::const_iterator end) {
for (auto it = begin; it != end; ++it)
{
const String & table = *it;
const String & table_file = *it;

/// Messages, so that it's not boring to wait for the server to load for a long time.
if ((++tables_processed) % PRINT_MESSAGE_EACH_N_TABLES == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
Expand All @@ -114,7 +119,32 @@ void DatabaseOrdinary::loadTables(Context & context, legacy::ThreadPool * thread
watch.restart();
}

DatabaseLoading::loadTable(context, *this, metadata_path, name, data_path, getEngineName(), table, has_force_restore_data_flag);
auto [table_name, table] = DatabaseLoading::loadTable(context, *this, metadata_path, name, data_path, getEngineName(), table_file, has_force_restore_data_flag);

/// After table was basically initialized, startup it.
if (table)
{
try
{
table->startup();
}
catch (DB::Exception & e)
{
if (e.code() == ErrorCodes::TIDB_TABLE_ALREADY_EXISTS)
{
// While doing IStorage::startup, Exception thorwn with TIDB_TABLE_ALREADY_EXISTS,
// means that we may crashed in the middle of renaming tables. We clean the meta file
// for those storages by `cleanupTables`.
// - If the storage is the outdated one after renaming, remove it is right.
// - If the storage should be the target table, remove it means we "rollback" the
// rename action. And the table will be renamed by TiDBSchemaSyncer later.
std::lock_guard lock(failed_tables_mutex);
tables_failed_to_startup.emplace(table_name, table);
}
else
throw;
}
}
}
};

Expand All @@ -126,21 +156,20 @@ void DatabaseOrdinary::loadTables(Context & context, legacy::ThreadPool * thread
auto begin = file_names.begin() + i * bunch_size;
auto end = (i + 1 == num_bunches) ? file_names.end() : (file_names.begin() + (i + 1) * bunch_size);

auto task = [task_function, begin, end] {
return task_function(begin, end);
auto task = [&task_function, begin, end] {
task_function(begin, end);
};

if (thread_pool)
thread_pool->schedule(task);
wait_group->schedule(task);
else
task();
}

if (thread_pool)
thread_pool->wait();
wait_group->wait();

/// After all tables was basically initialized, startup them.
DatabaseLoading::startupTables(*this, name, tables, thread_pool, log);
DatabaseLoading::cleanupTables(*this, name, tables_failed_to_startup, log);
}

void DatabaseOrdinary::createTable(const Context & context, const String & table_name, const StoragePtr & table, const ASTPtr & query)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Databases/DatabaseOrdinary.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class DatabaseOrdinary : public DatabaseWithOwnTablesBase

void loadTables(
Context & context,
legacy::ThreadPool * thread_pool,
ThreadPool * thread_pool,
bool has_force_restore_data_flag) override;

void createTable(
Expand Down
Loading

0 comments on commit de73ae5

Please sign in to comment.