Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically configure threadpool size based on physical cpu core count. #60

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ cc_library(
"//src/coordinator:util",
"//src/utils:string_interning",
"//vmsdk/src:command_parser",
"//vmsdk/src:concurrency",
"//vmsdk/src:latency_sampler",
"//vmsdk/src:log",
"//vmsdk/src:managed_pointers",
Expand Down Expand Up @@ -281,4 +282,4 @@ proto_library(
cc_proto_library(
name = "index_schema_cc_proto",
deps = [":index_schema_proto"],
)
)
7 changes: 4 additions & 3 deletions src/valkey_search.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
#include "src/utils/string_interning.h"
#include "src/vector_externalizer.h"
#include "vmsdk/src/command_parser.h"
#include "vmsdk/src/concurrency.h"
#include "vmsdk/src/latency_sampler.h"
#include "vmsdk/src/log.h"
#include "vmsdk/src/managed_pointers.h"
Expand All @@ -85,8 +86,8 @@ constexpr absl::string_view kUseCoordinator{"--use-coordinator"};
constexpr absl::string_view kLogLevel{"--log-level"};

struct Parameters {
int reader_threads{10};
int writer_threads{30};
int reader_threads{static_cast<int>(vmsdk::GetPhysicalCPUCoresCount())};
int writer_threads{static_cast<int>(vmsdk::GetPhysicalCPUCoresCount())};
std::optional<int> threads;
bool use_coordinator{false};
std::optional<std::string> log_level;
Expand Down Expand Up @@ -547,4 +548,4 @@ void ValkeySearch::OnUnload(RedisModuleCtx *ctx) {
reader_thread_pool_ = nullptr;
}

} // namespace valkey_search
} // namespace valkey_search
12 changes: 11 additions & 1 deletion vmsdk/src/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ cc_library(
],
)

cc_library(
name = "concurrency",
srcs = [
"concurrency.cc",
],
hdrs = [
"concurrency.h",
],
)

cc_library(
name = "module",
srcs = ["module.cc"],
Expand Down Expand Up @@ -181,4 +191,4 @@ cc_library(
"//vmsdk/src/valkey_module_api:valkey_module",
"@com_google_absl//absl/strings:string_view",
],
)
)
71 changes: 71 additions & 0 deletions vmsdk/src/concurrency.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2025, ValkeySearch contributors
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "vmsdk/src/concurrency.h"

#include <fstream>
#include <string>
#include <thread>
#include <unordered_map>

namespace vmsdk {

size_t GetPhysicalCPUCoresCount() {
#ifdef __linux__
// Linux-specific implementation
std::ifstream cpuinfo("/proc/cpuinfo");
if (!cpuinfo.is_open()) {
return 0; // Could not read /proc/cpuinfo
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest that we emit some error message and return std::thread::hardware_concurrency()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yairgott a std:.error message (or) run time error message (or) just print a standard error message in console before you return the std::thread::hardware_concurrency()?, could you be little more specific on what you mean by "emit"?.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think i will probably use some thing like this here: wdyt?.
std::cerr << "Warning: Could not read /proc/cpuinfo. Falling back to std::thread::hardware_concurrency().\n";
return std::thread::hardware_concurrency();

}
std::string line;
int physical_id = -1;
int cores_per_cpu = -1;
std::unordered_map<int, int> physical_cpu_cores;
size_t total_physical_cores = 0;

while (std::getline(cpuinfo, line)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets break the parsing into its own function and add a parsing unittest.

if (line.find("physical id") != std::string::npos) {
physical_id = std::stoi(line.substr(line.find(':') + 1));
} else if (line.find("cpu cores") != std::string::npos) {
cores_per_cpu = std::stoi(line.substr(line.find(':') + 1));
if (physical_id != -1 && cores_per_cpu != -1) {
physical_cpu_cores[physical_id] = cores_per_cpu;
}
}
}
for (const auto& [id, core_count] : physical_cpu_cores) {
total_physical_cores += core_count;
}
return total_physical_cores;
#else
// Non-Linux platforms
return std::thread::hardware_concurrency();
#endif
}
} // namespace vmsdk
42 changes: 42 additions & 0 deletions vmsdk/src/concurrency.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2025, ValkeySearch contributors
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef VMSDK_SRC_CONCURRENCY_H_
#define VMSDK_SRC_CONCURRENCY_H_

#include <cstddef> // For size_t

namespace vmsdk {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets move all the threading related logic from utils.h/cc to here: https://github.com/valkey-io/valkey-search/blob/main/vmsdk/src/utils.h#L55-L108

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, We could do this in another PR, there may be some changes in the over all code base, if i do this, specially when i try to change the utils.h/cc dependent src files, i perhaps have to do some more changes, so in my suggestion is we could keep it for refractoring issue and i would gladly work on that, Wdyt? just to keep it simple for this specific issue.

// Returns the number of physical CPU cores
size_t GetPhysicalCPUCoresCount();

} // namespace vmsdk

#endif // VMSDK_SRC_CONCURRENCY_H_