Skip to content

Commit

Permalink
ARROW-10567: [C++] Add multiple perf runs options for higher precisio…
Browse files Browse the repository at this point in the history
…n reporting

I've found that these benchmarks have high variance on the Linux platforms (Ubuntu 18.04-based) where I've tested, and increasing the number of iterations of the test produces more precise results

Closes #8649 from wesm/flight-benchmark-config-options

Authored-by: Wes McKinney <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
wesm authored and lidavidm committed Nov 12, 2020
1 parent 9baa123 commit 40a7a6f
Showing 1 changed file with 31 additions and 18 deletions.
49 changes: 31 additions & 18 deletions cpp/src/arrow/flight/flight_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ DEFINE_string(server_host, "",
"An existing performance server to benchmark against (leave blank to spawn "
"one automatically)");
DEFINE_int32(server_port, 31337, "The port to connect to");
DEFINE_int32(num_perf_runs, 1,
"Number of times to run the perf test to "
"increase precision");
DEFINE_int32(num_servers, 1, "Number of performance servers to run");
DEFINE_int32(num_streams, 4, "Number of streams for each server");
DEFINE_int32(num_threads, 4, "Number of concurrent gets");
Expand Down Expand Up @@ -120,7 +123,7 @@ Status WaitForReady(FlightClient* client) {
arrow::Result<PerformanceResult> RunDoGetTest(FlightClient* client,
const perf::Token& token,
const FlightEndpoint& endpoint,
PerformanceStats& stats) {
PerformanceStats* stats) {
std::unique_ptr<FlightStreamReader> reader;
RETURN_NOT_OK(client->DoGet(endpoint.ticket, &reader));

Expand All @@ -139,7 +142,7 @@ arrow::Result<PerformanceResult> RunDoGetTest(FlightClient* client,
while (true) {
timer.Start();
RETURN_NOT_OK(reader->Next(&batch));
stats.AddLatency(timer.Stop());
stats->AddLatency(timer.Stop());
if (!batch.data) {
break;
}
Expand All @@ -166,7 +169,7 @@ arrow::Result<PerformanceResult> RunDoGetTest(FlightClient* client,
arrow::Result<PerformanceResult> RunDoPutTest(FlightClient* client,
const perf::Token& token,
const FlightEndpoint& endpoint,
PerformanceStats& stats) {
PerformanceStats* stats) {
std::unique_ptr<FlightStreamWriter> writer;
std::unique_ptr<FlightMetadataReader> reader;
std::shared_ptr<Schema> schema =
Expand Down Expand Up @@ -209,7 +212,7 @@ arrow::Result<PerformanceResult> RunDoPutTest(FlightClient* client,
} else {
timer.Start();
RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
stats.AddLatency(timer.Stop());
stats->AddLatency(timer.Stop());
num_records += length;
// Hard-coded
num_bytes += length * bytes_per_record;
Expand All @@ -222,10 +225,7 @@ arrow::Result<PerformanceResult> RunDoPutTest(FlightClient* client,
return PerformanceResult{num_batches, num_records, num_bytes};
}

Status RunPerformanceTest(FlightClient* client, bool test_put) {
// TODO(wesm): Multiple servers
// std::vector<std::unique_ptr<TestServer>> servers;

Status DoSinglePerfRun(FlightClient* client, bool test_put, PerformanceStats* stats) {
// schema not needed
perf::Perf perf;
perf.set_stream_count(FLAGS_num_streams);
Expand All @@ -245,7 +245,8 @@ Status RunPerformanceTest(FlightClient* client, bool test_put) {
ipc::DictionaryMemo dict_memo;
RETURN_NOT_OK(plan->GetSchema(&dict_memo, &schema));

PerformanceStats stats;
int64_t start_total_records = stats->total_records;

auto test_loop = test_put ? &RunDoPutTest : &RunDoGetTest;
auto ConsumeStream = [&stats, &test_loop](const FlightEndpoint& endpoint) {
// TODO(wesm): Use location from endpoint, same host/port for now
Expand All @@ -258,14 +259,11 @@ Status RunPerformanceTest(FlightClient* client, bool test_put) {
const auto& result = test_loop(client.get(), token, endpoint, stats);
if (result.ok()) {
const PerformanceResult& perf = result.ValueOrDie();
stats.Update(perf.num_batches, perf.num_records, perf.num_bytes);
stats->Update(perf.num_batches, perf.num_records, perf.num_bytes);
}
return result.status();
};

StopWatch timer;
timer.Start();

// XXX(wesm): Serial version for debugging
// for (const auto& endpoint : plan->endpoints()) {
// RETURN_NOT_OK(ConsumeStream(endpoint));
Expand All @@ -283,18 +281,33 @@ Status RunPerformanceTest(FlightClient* client, bool test_put) {
RETURN_NOT_OK(task.status());
}

// Check that number of rows read / written is as expected
int64_t records_for_run = stats->total_records - start_total_records;
if (records_for_run != static_cast<int64_t>(plan->total_records())) {
return Status::Invalid("Did not consume expected number of records");
}

return Status::OK();
}

Status RunPerformanceTest(FlightClient* client, bool test_put) {
StopWatch timer;
timer.Start();

PerformanceStats stats;
for (int i = 0; i < FLAGS_num_perf_runs; ++i) {
RETURN_NOT_OK(DoSinglePerfRun(client, test_put, &stats));
}

// Elapsed time in seconds
uint64_t elapsed_nanos = timer.Stop();
double time_elapsed =
static_cast<double>(elapsed_nanos) / static_cast<double>(1000000000);

constexpr double kMegabyte = static_cast<double>(1 << 20);

// Check that number of rows read / written is as expected
if (stats.total_records != static_cast<int64_t>(plan->total_records())) {
return Status::Invalid("Did not consume expected number of records");
}

std::cout << "Number of perf runs: " << FLAGS_num_perf_runs << std::endl;
std::cout << "Number of concurrent gets/puts: " << FLAGS_num_threads << std::endl;
std::cout << "Batch size: " << stats.total_bytes / stats.total_batches << std::endl;
if (FLAGS_test_put) {
std::cout << "Batches written: " << stats.total_batches << std::endl;
Expand Down

0 comments on commit 40a7a6f

Please sign in to comment.