diff --git a/cpp/velox/benchmarks/QueryBenchmark.cc b/cpp/velox/benchmarks/QueryBenchmark.cc index 791f2540b1e21..8ad8fbe3ba112 100644 --- a/cpp/velox/benchmarks/QueryBenchmark.cc +++ b/cpp/velox/benchmarks/QueryBenchmark.cc @@ -30,6 +30,34 @@ const std::string getFilePath(const std::string& fileName) { return filePath; } +// Used by unit test and benchmark. +std::shared_ptr getResultIterator( + MemoryAllocator* allocator, + std::shared_ptr backend, + const std::vector>& setScanInfos, + std::shared_ptr& veloxPlan) { + auto veloxPool = asWrappedVeloxAggregateMemoryPool(allocator); + auto ctxPool =veloxPool->addAggregateChild("query_benchmark_result_iterator"); + auto resultPool = getDefaultVeloxLeafMemoryPool(); + + std::vector> inputIter; + auto veloxPlanConverter = std::make_unique(inputIter, resultPool); + veloxPlan = veloxPlanConverter->toVeloxPlan(backend->getPlan()); + + // In test, use setScanInfos to replace the one got from Substrait. + std::vector> scanInfos; + std::vector scanIds; + std::vector streamIds; + + // Separate the scan ids and stream ids, and get the scan infos. + VeloxBackend::getInfoAndIds( + veloxPlanConverter->splitInfos(), veloxPlan->leafPlanNodeIds(), scanInfos, scanIds, streamIds); + + auto wholestageIter = std::make_unique( + ctxPool, resultPool, veloxPlan, scanIds, setScanInfos, streamIds, "/tmp/test-spill", backend->getConfMap()); + return std::make_shared(std::move(wholestageIter), backend); +} + auto bm = [](::benchmark::State& state, const std::vector& datasetPaths, const std::string& jsonFile, @@ -52,17 +80,20 @@ auto bm = [](::benchmark::State& state, scanInfos.emplace_back(getSplitInfosFromFile(datasetPath, fileFormat)); std::cout << "== not /" << std::endl; } + std::cout << datasetPath << std::endl; } for (auto _ : state) { state.PauseTiming(); auto backend = std::dynamic_pointer_cast(gluten::createBackend()); state.ResumeTiming(); + backend->parsePlan(plan->data(), plan->size()); - auto resultIter = backend->getResultIterator(gluten::defaultMemoryAllocator().get(), scanInfos); - auto outputSchema = backend->GetOutputSchema(); - while (resultIter->HasNext()) { - auto array = resultIter->Next()->exportArrowArray(); + std::shared_ptr veloxPlan; + auto resultIter = getResultIterator(gluten::defaultMemoryAllocator().get(), backend, scanInfos, veloxPlan); + auto outputSchema = getOutputSchema(veloxPlan); + while (resultIter->hasNext()) { + auto array = resultIter->next()->exportArrowArray(); auto maybeBatch = arrow::ImportRecordBatch(array.get(), outputSchema); if (!maybeBatch.ok()) { state.SkipWithError(maybeBatch.status().message().c_str());