From 1442cf57ec3e14f99f2b7a3b46327c10d10b909c Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Thu, 4 Jan 2024 14:37:25 +0000 Subject: [PATCH 1/4] Add batching to writes --- CMakeLists.txt | 1 + example-config.json | 3 +- src/data/cassandra/SettingsProvider.cpp | 2 +- src/data/cassandra/impl/Cluster.cpp | 1 + src/data/cassandra/impl/Cluster.h | 5 ++ src/data/cassandra/impl/ExecutionStrategy.h | 42 +++++++----- src/util/Batching.h | 46 +++++++++++++ unittests/util/BatchingTests.cpp | 76 +++++++++++++++++++++ 8 files changed, 158 insertions(+), 18 deletions(-) create mode 100644 src/util/Batching.h create mode 100644 unittests/util/BatchingTests.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 609849770..d8b06e485 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -184,6 +184,7 @@ if (tests) unittests/SubscriptionTests.cpp unittests/SubscriptionManagerTests.cpp unittests/util/AssertTests.cpp + unittests/util/BatchingTests.cpp unittests/util/TestObject.cpp unittests/util/StringUtils.cpp unittests/util/prometheus/CounterTests.cpp diff --git a/example-config.json b/example-config.json index ea7b77c97..46772280e 100644 --- a/example-config.json +++ b/example-config.json @@ -16,7 +16,8 @@ // // Advanced options. USE AT OWN RISK: // --- - "core_connections_per_host": 1 // Defaults to 1 + "core_connections_per_host": 1, // Defaults to 1 + "write_batch_size": 20 // Defaults to 20 // // Below options will use defaults from cassandra driver if left unspecified. // See https://docs.datastax.com/en/developer/cpp-driver/2.17/api/struct.CassCluster/ for details. diff --git a/src/data/cassandra/SettingsProvider.cpp b/src/data/cassandra/SettingsProvider.cpp index fce95c9d9..0789f90bf 100644 --- a/src/data/cassandra/SettingsProvider.cpp +++ b/src/data/cassandra/SettingsProvider.cpp @@ -122,8 +122,8 @@ SettingsProvider::parseSettings() const config_.valueOr("max_read_requests_outstanding", settings.maxReadRequestsOutstanding); settings.coreConnectionsPerHost = config_.valueOr("core_connections_per_host", settings.coreConnectionsPerHost); - settings.queueSizeIO = config_.maybeValue("queue_size_io"); + settings.writeBatchSize = config_.valueOr("write_batch_size", settings.writeBatchSize); auto const connectTimeoutSecond = config_.maybeValue("connect_timeout"); if (connectTimeoutSecond) diff --git a/src/data/cassandra/impl/Cluster.cpp b/src/data/cassandra/impl/Cluster.cpp index e93f02b35..a9d2a2ecf 100644 --- a/src/data/cassandra/impl/Cluster.cpp +++ b/src/data/cassandra/impl/Cluster.cpp @@ -83,6 +83,7 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), c LOG(log_.info()) << "Threads: " << settings.threads; LOG(log_.info()) << "Core connections per host: " << settings.coreConnectionsPerHost; LOG(log_.info()) << "IO queue size: " << queueSize; + LOG(log_.info()) << "Batched writes auto-chunk size: " << settings.writeBatchSize; } void diff --git a/src/data/cassandra/impl/Cluster.h b/src/data/cassandra/impl/Cluster.h index 3fbc3e16d..aa1b914bc 100644 --- a/src/data/cassandra/impl/Cluster.h +++ b/src/data/cassandra/impl/Cluster.h @@ -42,6 +42,8 @@ struct Settings { static constexpr std::size_t DEFAULT_CONNECTION_TIMEOUT = 10000; static constexpr uint32_t DEFAULT_MAX_WRITE_REQUESTS_OUTSTANDING = 10'000; static constexpr uint32_t DEFAULT_MAX_READ_REQUESTS_OUTSTANDING = 100'000; + static constexpr std::size_t DEFAULT_BATCH_SIZE = 20; + /** * @brief Represents the configuration of contact points for cassandra. */ @@ -81,6 +83,9 @@ struct Settings { /** @brief The number of connection per host to always have active */ uint32_t coreConnectionsPerHost = 1u; + /** @brief Size of batches when writing */ + std::size_t writeBatchSize = DEFAULT_BATCH_SIZE; + /** @brief Size of the IO queue */ std::optional queueSizeIO{}; diff --git a/src/data/cassandra/impl/ExecutionStrategy.h b/src/data/cassandra/impl/ExecutionStrategy.h index 18b4d7c5c..75cb5ac3c 100644 --- a/src/data/cassandra/impl/ExecutionStrategy.h +++ b/src/data/cassandra/impl/ExecutionStrategy.h @@ -25,6 +25,7 @@ #include "data/cassandra/Types.h" #include "data/cassandra/impl/AsyncExecutor.h" #include "util/Assert.h" +#include "util/Batching.h" #include "util/Expected.h" #include "util/log/Logger.h" @@ -59,6 +60,8 @@ class DefaultExecutionStrategy { std::uint32_t maxReadRequestsOutstanding_; std::atomic_uint32_t numReadRequestsOutstanding_ = 0; + std::size_t writeBatchSize_; + std::mutex throttleMutex_; std::condition_variable throttleCv_; @@ -93,6 +96,7 @@ class DefaultExecutionStrategy { ) : maxWriteRequestsOutstanding_{settings.maxWriteRequestsOutstanding} , maxReadRequestsOutstanding_{settings.maxReadRequestsOutstanding} + , writeBatchSize_{settings.writeBatchSize} , work_{ioc_} , handle_{std::cref(handle)} , thread_{[this]() { ioc_.run(); }} @@ -214,22 +218,28 @@ class DefaultExecutionStrategy { if (statements.empty()) return; - auto const startTime = std::chrono::steady_clock::now(); - - incrementOutstandingRequestCount(); - - counters_->registerWriteStarted(); - // Note: lifetime is controlled by std::shared_from_this internally - AsyncExecutor, HandleType>::run( - ioc_, - handle_, - std::move(statements), - [this, startTime](auto const&) { - decrementOutstandingRequestCount(); - counters_->registerWriteFinished(startTime); - }, - [this]() { counters_->registerWriteRetry(); } - ); + util::forEachBatch(std::move(statements), writeBatchSize_, [this](auto begin, auto end) { + auto const startTime = std::chrono::steady_clock::now(); + auto chunk = std::vector{}; + + chunk.reserve(std::distance(begin, end)); + std::move(begin, end, std::back_inserter(chunk)); + + incrementOutstandingRequestCount(); + counters_->registerWriteStarted(); + + // Note: lifetime is controlled by std::shared_from_this internally + AsyncExecutor, HandleType>::run( + ioc_, + handle_, + std::move(chunk), + [this, startTime](auto const&) { + decrementOutstandingRequestCount(); + counters_->registerWriteFinished(startTime); + }, + [this]() { counters_->registerWriteRetry(); } + ); + }); } /** diff --git a/src/util/Batching.h b/src/util/Batching.h new file mode 100644 index 000000000..005e422f8 --- /dev/null +++ b/src/util/Batching.h @@ -0,0 +1,46 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "util/Assert.h" + +#include +#include +#include +#include + +namespace util { + +void +forEachBatch(std::ranges::forward_range auto&& container, std::size_t batchSize, auto&& fn) +{ + ASSERT(batchSize > 0, "Batch size must be greater than 0"); + + auto const totalSize = container.size(); + auto const batches = totalSize / batchSize + (totalSize % batchSize ? 1 : 0); + + for (auto i = 0u; i < batches; ++i) { + auto const start = i * batchSize; + auto const end = std::min(start + batchSize, totalSize); + fn(container.begin() + start, container.begin() + end); + } +} + +} // namespace util diff --git a/unittests/util/BatchingTests.cpp b/unittests/util/BatchingTests.cpp new file mode 100644 index 000000000..f33687d02 --- /dev/null +++ b/unittests/util/BatchingTests.cpp @@ -0,0 +1,76 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2023, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "util/Batching.h" + +#include + +#include + +TEST(BatchingTests, simpleBatch) +{ + std::vector const input{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + std::vector output; + + util::forEachBatch(input, 3, [&](auto begin, auto end) { + std::copy(begin, end, std::back_inserter(output)); + EXPECT_LE(std::distance(begin, end), 3); + }); + + EXPECT_EQ(input, output); +} + +TEST(BatchingTests, simpleBatchEven) +{ + std::vector const input{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + std::vector output; + + util::forEachBatch(input, 2, [&](auto begin, auto end) { + std::copy(begin, end, std::back_inserter(output)); + EXPECT_LE(std::distance(begin, end), 2); + }); + + EXPECT_EQ(input, output); +} + +TEST(BatchingTests, batchSizeBiggerThanInput) +{ + std::vector const input{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + std::vector output; + + util::forEachBatch(input, 20, [&](auto begin, auto end) { + std::copy(begin, end, std::back_inserter(output)); + EXPECT_LE(std::distance(begin, end), 20); + }); + + EXPECT_EQ(input, output); +} + +TEST(BatchingTests, emptyInput) +{ + std::vector const input{}; + std::vector output; + + util::forEachBatch(input, 20, [&](auto begin, auto end) { + std::copy(begin, end, std::back_inserter(output)); + ASSERT_FALSE(true) << "Should not be called"; + }); + + EXPECT_EQ(input, output); +} From b7b7fa7e8afe8104b061a6d5ae6ed2bbb9aa6957 Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Thu, 4 Jan 2024 14:42:51 +0000 Subject: [PATCH 2/4] Fix issues from self-review --- src/util/Batching.h | 1 - unittests/util/BatchingTests.cpp | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/util/Batching.h b/src/util/Batching.h index 005e422f8..55cd644e6 100644 --- a/src/util/Batching.h +++ b/src/util/Batching.h @@ -24,7 +24,6 @@ #include #include #include -#include namespace util { diff --git a/unittests/util/BatchingTests.cpp b/unittests/util/BatchingTests.cpp index f33687d02..7bf122bda 100644 --- a/unittests/util/BatchingTests.cpp +++ b/unittests/util/BatchingTests.cpp @@ -1,7 +1,7 @@ //------------------------------------------------------------------------------ /* This file is part of clio: https://github.com/XRPLF/clio - Copyright (c) 2023, the clio developers. + Copyright (c) 2024, the clio developers. Permission to use, copy, modify, and distribute this software for any purpose with or without fee is hereby granted, provided that the above From c12738cd3e6cda35977a2c1cf039ac3f542346e4 Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Thu, 4 Jan 2024 21:46:09 +0000 Subject: [PATCH 3/4] Make impl for forward_iterator --- src/util/Batching.h | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/util/Batching.h b/src/util/Batching.h index 55cd644e6..aa878812a 100644 --- a/src/util/Batching.h +++ b/src/util/Batching.h @@ -22,7 +22,6 @@ #include "util/Assert.h" #include -#include #include namespace util { @@ -32,13 +31,19 @@ forEachBatch(std::ranges::forward_range auto&& container, std::size_t batchSize, { ASSERT(batchSize > 0, "Batch size must be greater than 0"); - auto const totalSize = container.size(); - auto const batches = totalSize / batchSize + (totalSize % batchSize ? 1 : 0); + auto to = std::begin(container); + auto end = std::end(container); - for (auto i = 0u; i < batches; ++i) { - auto const start = i * batchSize; - auto const end = std::min(start + batchSize, totalSize); - fn(container.begin() + start, container.begin() + end); + while (to != end) { + auto from = to; + + auto cnt = batchSize; + while (to != end and cnt > 0) { + ++to; + --cnt; + } + + std::invoke(fn, from, to); } } From 926ab64a56847f762a0cc1027bc8e704b56fa0b3 Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Thu, 4 Jan 2024 21:50:35 +0000 Subject: [PATCH 4/4] Add missing includes --- src/util/Batching.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/util/Batching.h b/src/util/Batching.h index aa878812a..f0931246a 100644 --- a/src/util/Batching.h +++ b/src/util/Batching.h @@ -22,6 +22,8 @@ #include "util/Assert.h" #include +#include +#include #include namespace util {