Skip to content

Commit

Permalink
xx
Browse files Browse the repository at this point in the history
  • Loading branch information
zuochunwei committed May 15, 2023
1 parent 515aa08 commit 649d800
Showing 1 changed file with 35 additions and 4 deletions.
39 changes: 35 additions & 4 deletions cpp/velox/benchmarks/QueryBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,34 @@ const std::string getFilePath(const std::string& fileName) {
return filePath;
}

// Used by unit test and benchmark.
std::shared_ptr<ResultIterator> getResultIterator(
MemoryAllocator* allocator,
std::shared_ptr<Backend> backend,
const std::vector<std::shared_ptr<velox::substrait::SplitInfo>>& setScanInfos,
std::shared_ptr<const facebook::velox::core::PlanNode>& veloxPlan) {
auto veloxPool = asWrappedVeloxAggregateMemoryPool(allocator);
auto ctxPool =veloxPool->addAggregateChild("query_benchmark_result_iterator");
auto resultPool = getDefaultVeloxLeafMemoryPool();

std::vector<std::shared_ptr<ResultIterator>> inputIter;
auto veloxPlanConverter = std::make_unique<VeloxPlanConverter>(inputIter, resultPool);
veloxPlan = veloxPlanConverter->toVeloxPlan(backend->getPlan());

// In test, use setScanInfos to replace the one got from Substrait.
std::vector<std::shared_ptr<velox::substrait::SplitInfo>> scanInfos;
std::vector<velox::core::PlanNodeId> scanIds;
std::vector<velox::core::PlanNodeId> streamIds;

// Separate the scan ids and stream ids, and get the scan infos.
VeloxBackend::getInfoAndIds(
veloxPlanConverter->splitInfos(), veloxPlan->leafPlanNodeIds(), scanInfos, scanIds, streamIds);

auto wholestageIter = std::make_unique<WholeStageResultIteratorFirstStage>(
ctxPool, resultPool, veloxPlan, scanIds, setScanInfos, streamIds, "/tmp/test-spill", backend->getConfMap());
return std::make_shared<ResultIterator>(std::move(wholestageIter), backend);
}

auto bm = [](::benchmark::State& state,
const std::vector<std::string>& datasetPaths,
const std::string& jsonFile,
Expand All @@ -52,17 +80,20 @@ auto bm = [](::benchmark::State& state,
scanInfos.emplace_back(getSplitInfosFromFile(datasetPath, fileFormat));
std::cout << "== not /" << std::endl;
}
std::cout << datasetPath << std::endl;
}

for (auto _ : state) {
state.PauseTiming();
auto backend = std::dynamic_pointer_cast<gluten::VeloxBackend>(gluten::createBackend());
state.ResumeTiming();

backend->parsePlan(plan->data(), plan->size());
auto resultIter = backend->getResultIterator(gluten::defaultMemoryAllocator().get(), scanInfos);
auto outputSchema = backend->GetOutputSchema();
while (resultIter->HasNext()) {
auto array = resultIter->Next()->exportArrowArray();
std::shared_ptr<const facebook::velox::core::PlanNode> veloxPlan;
auto resultIter = getResultIterator(gluten::defaultMemoryAllocator().get(), backend, scanInfos, veloxPlan);
auto outputSchema = getOutputSchema(veloxPlan);
while (resultIter->hasNext()) {
auto array = resultIter->next()->exportArrowArray();
auto maybeBatch = arrow::ImportRecordBatch(array.get(), outputSchema);
if (!maybeBatch.ok()) {
state.SkipWithError(maybeBatch.status().message().c_str());
Expand Down

0 comments on commit 649d800

Please sign in to comment.