Skip to content

Commit

Permalink
Test: Support coprocessor test (#5957)
Browse files Browse the repository at this point in the history
ref #4609
  • Loading branch information
ywqzzy authored Sep 20, 2022
1 parent 1223584 commit c003541
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 13 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Debug/MockComputeServerManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ extern const int IP_ADDRESS_NOT_ALLOWED;
} // namespace ErrorCodes
namespace tests
{
void MockComputeServerManager::addServer(String addr)
void MockComputeServerManager::addServer(const String & addr)
{
MockServerConfig config;
for (const auto & server : server_config_map)
Expand Down Expand Up @@ -135,4 +135,4 @@ String MockComputeServerManager::queryInfo()
return buf.toString();
}
} // namespace tests
} // namespace DB
} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/Debug/MockComputeServerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class MockComputeServerManager : public ext::Singleton<MockComputeServerManager>
{
public:
/// register an server to run.
void addServer(String addr);
void addServer(const String & addr);

/// call startServers to run all servers in current test.
void startServers(const LoggerPtr & log_ptr, Context & global_context);
Expand Down Expand Up @@ -61,4 +61,4 @@ class MockComputeServerManager : public ext::Singleton<MockComputeServerManager>
std::unordered_map<size_t, std::unique_ptr<FlashGrpcServerHolder>> server_map;
std::unordered_map<size_t, MockServerConfig> server_config_map;
};
} // namespace DB::tests
} // namespace DB::tests
1 change: 0 additions & 1 deletion dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ class UniqRawResReformatBlockOutputStream : public IProfilingBlockInputStream
tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version, Timestamp start_ts, std::vector<std::pair<DecodedTiKVKeyPtr, DecodedTiKVKeyPtr>> & key_ranges);
bool runAndCompareDagReq(const coprocessor::Request & req, const coprocessor::Response & res, Context & context, String & unequal_msg);
BlockInputStreamPtr outputDAGResponse(Context & context, const DAGSchema & schema, const tipb::SelectResponse & dag_response);
DAGSchema getSelectSchema(Context & context);
bool dagRspEqual(Context & context, const tipb::SelectResponse & expected, const tipb::SelectResponse & actual, String & unequal_msg);

DAGProperties getDAGProperties(const String & prop_string)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Debug/dbgFuncCoprocessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ QueryTasks queryPlanToQueryTasks(
BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DAGProperties & properties, QueryTasks & query_tasks, MakeResOutputStream & func_wrap_output_stream);
BlockInputStreamPtr executeMPPQuery(Context & context, const DAGProperties & properties, QueryTasks & query_tasks);
std::vector<BlockInputStreamPtr> executeMPPQueryWithMultipleContext(const DAGProperties & properties, QueryTasks & query_tasks, std::unordered_map<size_t, MockServerConfig> & server_config_map);
DAGSchema getSelectSchema(Context & context);
std::unique_ptr<ChunkCodec> getCodec(tipb::EncodeType encode_type);

namespace Debug
{
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ grpc::Status FlashService::Coprocessor(
CPUAffinityManager::getInstance().bindSelfGrpcThread();
LOG_FMT_DEBUG(log, "Handling coprocessor request: {}", request->DebugString());

if (!security_config->checkGrpcContext(grpc_context))
// For coprocessor test, we don't care about security config.
if (unlikely(!context->isCopTest() && !security_config->checkGrpcContext(grpc_context)))
{
return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg);
}
Expand All @@ -119,6 +120,8 @@ grpc::Status FlashService::Coprocessor(
GET_METRIC(tiflash_coprocessor_response_bytes).Increment(response->ByteSizeLong());
});

context->setMockStorage(mock_storage);

grpc::Status ret = executeInThreadPool(*cop_pool, [&] {
auto [db_context, status] = createDBContext(grpc_context);
if (!status.ok())
Expand Down Expand Up @@ -430,8 +433,8 @@ std::tuple<ContextPtr, grpc::Status> FlashService::createDBContext(const grpc::S
std::string client_ip = peer.substr(pos + 1);
Poco::Net::SocketAddress client_address(client_ip);

// For MPP test, we don't care about security config.
if (!context->isMPPTest())
// For MPP or Cop test, we don't care about security config.
if (likely(!context->isTest()))
tmp_context->setUser(user, password, client_address, quota_key);

String query_id = getClientMetaVarWithDefault(grpc_context, "query_id", "");
Expand Down
20 changes: 19 additions & 1 deletion dbms/src/Flash/tests/gtest_compute_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,5 +381,23 @@ try
}
}
CATCH

TEST_F(ComputeServerRunner, runCoprocessor)
try
{
// In coprocessor test, we only need to start 1 server.
startServers(1);
{
auto request = context
.scan("test_db", "l_table")
.build(context);

auto expected_cols = {
toNullableVec<String>({{"banana", {}, "banana"}}),
toNullableVec<String>({{"apple", {}, "banana"}})};
ASSERT_COLUMNS_EQ_UR(expected_cols, executeCoprocessorTask(request));
}
}
CATCH
} // namespace tests
} // namespace DB
} // namespace DB
10 changes: 10 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,16 @@ void Context::setExecutorTest()
test_mode = executor_test;
}

bool Context::isCopTest() const
{
return test_mode == cop_test;
}

void Context::setCopTest()
{
test_mode = cop_test;
}

bool Context::isTest() const
{
return test_mode != non_test;
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class Context
{
non_test,
mpp_test,
cop_test,
executor_test,
cancel_test
};
Expand Down Expand Up @@ -478,6 +479,8 @@ class Context
void setCancelTest();
bool isExecutorTest() const;
void setExecutorTest();
void setCopTest();
bool isCopTest() const;
bool isTest() const;

void setMockStorage(MockStorage & mock_storage_);
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Server/MockComputeClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ class MockComputeClient
}
}

coprocessor::Response runCoprocessor(std::shared_ptr<coprocessor::Request> request)
{
coprocessor::Response response;
grpc::ClientContext context;
Status status = stub->Coprocessor(&context, *request, &response);
if (!status.ok())
{
throw Exception(fmt::format("Meet error while run coprocessor task, error code = {}, message = {}", status.error_code(), status.error_message()));
}

return response;
}

private:
std::unique_ptr<tikvpb::Tikv::Stub> stub{};
};
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/TestUtils/ExecutorTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ void ExecutorTest::executeAndAssertRowsEqual(const std::shared_ptr<tipb::DAGRequ
});
}

namespace
{
Block mergeBlocks(Blocks blocks)
{
if (blocks.empty())
Expand Down Expand Up @@ -193,7 +191,6 @@ Block mergeBlocks(Blocks blocks)
actual_columns.push_back({std::move(actual_cols[i]), sample_block.getColumnsWithTypeAndName()[i].type, sample_block.getColumnsWithTypeAndName()[i].name, sample_block.getColumnsWithTypeAndName()[i].column_id});
return Block(actual_columns);
}
} // namespace

void readStream(Blocks & blocks, BlockInputStreamPtr stream)
{
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/TestUtils/ExecutorTestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ TiDB::TP dataTypeToTP(const DataTypePtr & type);

DB::ColumnsWithTypeAndName readBlock(BlockInputStreamPtr stream);
DB::ColumnsWithTypeAndName readBlocks(std::vector<BlockInputStreamPtr> streams);
Block mergeBlocks(Blocks blocks);


#define WRAP_FOR_DIS_ENABLE_PLANNER_BEGIN \
std::vector<bool> bools{false, true}; \
Expand Down
44 changes: 43 additions & 1 deletion dbms/src/TestUtils/MPPTaskTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <Debug/dbgFuncCoprocessor.h>
#include <Server/MockComputeClient.h>
#include <TestUtils/MPPTaskTestUtils.h>

namespace DB::tests
Expand Down Expand Up @@ -84,6 +86,46 @@ ColumnsWithTypeAndName MPPTaskTestUtils::exeucteMPPTasks(QueryTasks & tasks, con
return readBlocks(res);
}

ColumnsWithTypeAndName extractColumns(Context & context, const std::shared_ptr<tipb::SelectResponse> & dag_response)
{
auto codec = getCodec(dag_response->encode_type());
Blocks blocks;
auto schema = getSelectSchema(context);
for (const auto & chunk : dag_response->chunks())
blocks.emplace_back(codec->decode(chunk.rows_data(), schema));
return mergeBlocks(blocks).getColumnsWithTypeAndName();
}

ColumnsWithTypeAndName MPPTaskTestUtils::executeCoprocessorTask(std::shared_ptr<tipb::DAGRequest> & dag_request)
{
assert(server_num == 1);
auto req = std::make_shared<coprocessor::Request>();
req->set_tp(103); // 103 is COP_REQ_TYPE_DAG
auto * data = req->mutable_data();
dag_request->AppendToString(data);

DAGContext dag_context(*dag_request);

TiFlashTestEnv::getGlobalContext(test_meta.context_idx).setDAGContext(&dag_context);
TiFlashTestEnv::getGlobalContext(test_meta.context_idx).setCopTest();

MockComputeServerManager::instance().setMockStorage(context.mockStorage());

auto addr = MockComputeServerManager::instance().getServerConfigMap()[0].addr; // Since we only have started 1 server currently.
MockComputeClient client(
grpc::CreateChannel(addr, grpc::InsecureChannelCredentials()));
auto resp = client.runCoprocessor(req);
auto resp_ptr = std::make_shared<tipb::SelectResponse>();
if (unlikely(!resp_ptr->ParseFromString(resp.data())))
{
throw Exception("Incorrect json response data from Coprocessor.", ErrorCodes::BAD_ARGUMENTS);
}
else
{
return extractColumns(TiFlashTestEnv::getGlobalContext(test_meta.context_idx), resp_ptr);
}
}

String MPPTaskTestUtils::queryInfo(size_t server_id)
{
FmtBuffer buf;
Expand Down Expand Up @@ -124,4 +166,4 @@ ::testing::AssertionResult MPPTaskTestUtils::assertQueryActive(size_t start_ts)
}
return ::testing::AssertionSuccess();
}
} // namespace DB::tests
} // namespace DB::tests
3 changes: 3 additions & 0 deletions dbms/src/TestUtils/MPPTaskTestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class MPPTaskTestUtils : public ExecutorTest
std::tuple<size_t, std::vector<BlockInputStreamPtr>> prepareMPPStreams(DAGRequestBuilder builder);

ColumnsWithTypeAndName exeucteMPPTasks(QueryTasks & tasks, const DAGProperties & properties, std::unordered_map<size_t, MockServerConfig> & server_config_map);

ColumnsWithTypeAndName executeCoprocessorTask(std::shared_ptr<tipb::DAGRequest> & dag_request);

static ::testing::AssertionResult assertQueryCancelled(size_t start_ts);
static ::testing::AssertionResult assertQueryActive(size_t start_ts);
static String queryInfo(size_t server_id);
Expand Down

0 comments on commit c003541

Please sign in to comment.