From 3a901eb5225e86b78d725a8b991b70ec46447517 Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Tue, 6 Feb 2024 11:02:59 +0800 Subject: [PATCH 1/3] refactor(table_envs): Unify and remove duplicate variables (#1890) - Unify variables: | old variable name | unified variable name | value | |-------------------------------------------------|---------------------------------------------|---------------------------------------------| | ROCKSDB_ENV_USAGE_SCENARIO_KEY | ROCKSDB_USAGE_SCENARIO | rocksdb.usage_scenario | | ROCKDB_CHECKPOINT_RESERVE_MIN_COUNT | ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT | rocksdb.checkpoint.reserve_min_count | | ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS | ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS | rocksdb.checkpoint.reserve_time_seconds | | MANUAL_COMPACT_KEY_PREFIX | MANUAL_COMPACT_PREFIX | manual_compact. | | MANUAL_COMPACT_DISABLED_KEY | MANUAL_COMPACT_DISABLED | manual_compact.disabled | | MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT_KEY | MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT | manual_compact.max_concurrent_running_count | | MANUAL_COMPACT_PERIODIC_KEY_PREFIX | MANUAL_COMPACT_PERIODIC_PREFIX | manual_compact.periodic. | | MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY | MANUAL_COMPACT_PERIODIC_TRIGGER_TIME | manual_compact.periodic.trigger_time | | MANUAL_COMPACT_ONCE_KEY_PREFIX | MANUAL_COMPACT_ONCE_PREFIX | manual_compact.once. | | MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY | MANUAL_COMPACT_ONCE_TRIGGER_TIME | manual_compact.once.trigger_time | | MANUAL_COMPACT_TARGET_LEVEL_KEY | MANUAL_COMPACT_TARGET_LEVEL | target_level | | MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY | MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION | bottommost_level_compaction | | ROCKDB_CHECKPOINT_RESERVE_MIN_COUNT | ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT | rocksdb.checkpoint.reserve_min_count | | ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS | ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS | rocksdb.checkpoint.reserve_time_seconds | | ROCKSDB_ENV_SLOW_QUERY_THRESHOLD | SLOW_QUERY_THRESHOLD | replica.slow_query_threshold | - Move `MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE` and `MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP` to class pegasus_manual_compact_service - Move `PEGASUS_CLUSTER_SECTION_NAME` to src/common/common.h - Move `ROCKSDB_ENV_USAGE_SCENARIO_NORMAL`, `ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE` and `ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD` to class meta_store, because they are only used there. - Move `SCAN_CONTEXT_ID_VALID_MIN`, `SCAN_CONTEXT_ID_COMPLETED` and `SCAN_CONTEXT_ID_NOT_EXIST` to src/server/pegasus_scan_context.h and src/client_lib/pegasus_scanner_impl.cpp, where belongs to server and client separately. --- src/base/pegasus_const.cpp | 153 ------------------ src/base/pegasus_const.h | 97 ----------- src/client_lib/pegasus_client_impl.cpp | 7 +- src/client_lib/pegasus_scanner_impl.cpp | 8 +- src/common/common.cpp | 2 + src/common/common.h | 4 + src/common/replica_envs.cpp | 111 +++++++++++++ src/common/replica_envs.h | 9 +- src/common/replication_common.cpp | 56 ------- src/common/test/common_test.cpp | 2 - src/geo/test/geo_test.cpp | 4 +- src/ranger/ranger_resource_policy_manager.cpp | 3 +- src/redis_protocol/proxy_lib/redis_parser.cpp | 4 +- src/server/meta_store.cpp | 15 +- src/server/meta_store.h | 7 + src/server/pegasus_manual_compact_service.cpp | 23 +-- src/server/pegasus_manual_compact_service.h | 6 + src/server/pegasus_scan_context.h | 5 +- src/server/pegasus_server_impl.cpp | 120 +++++++++----- .../test/manual_compact_service_test.cpp | 79 ++++----- src/server/test/pegasus_server_impl_test.cpp | 25 +-- src/shell/main.cpp | 11 +- .../test_backup_and_restore.cpp | 1 - .../function_test/base_api/test_batch_get.cpp | 1 - src/test/function_test/base_api/test_scan.cpp | 8 +- src/test/function_test/base_api/test_ttl.cpp | 15 +- .../bulk_load/test_bulk_load.cpp | 6 +- .../function_test/restore/test_restore.cpp | 1 - src/test/function_test/utils/test_util.cpp | 4 +- src/test/kill_test/kill_testor.cpp | 4 +- 30 files changed, 340 insertions(+), 451 deletions(-) delete mode 100644 src/base/pegasus_const.cpp delete mode 100644 src/base/pegasus_const.h create mode 100644 src/common/replica_envs.cpp diff --git a/src/base/pegasus_const.cpp b/src/base/pegasus_const.cpp deleted file mode 100644 index 2a788cd24d..0000000000 --- a/src/base/pegasus_const.cpp +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "pegasus_const.h" - -#include -#include -#include - -#include "utils/string_conv.h" - -namespace pegasus { - -// should be same with items in dsn::backup_restore_constant -const std::string ROCKSDB_ENV_RESTORE_FORCE_RESTORE("restore.force_restore"); -const std::string ROCKSDB_ENV_RESTORE_POLICY_NAME("restore.policy_name"); -const std::string ROCKSDB_ENV_RESTORE_BACKUP_ID("restore.backup_id"); - -const std::string ROCKSDB_ENV_USAGE_SCENARIO_KEY("rocksdb.usage_scenario"); -const std::string ROCKSDB_ENV_USAGE_SCENARIO_NORMAL("normal"); -const std::string ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE("prefer_write"); -const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD("bulk_load"); - -/// A task of manual compaction can be triggered by update of app environment variables as follows: -/// Periodic manual compaction: triggered every day at the given `trigger_time`. -/// ``` -/// manual_compact.periodic.trigger_time=3:00,21:00 // required -/// manual_compact.periodic.target_level=-1 // optional, default -1 -/// manual_compact.periodic.bottommost_level_compaction=force // optional, default force -/// ``` -/// -/// Executed-once manual compaction: Triggered only at the specified unix time. -/// ``` -/// manual_compact.once.trigger_time=1525930272 // required -/// manual_compact.once.target_level=-1 // optional, default -1 -/// manual_compact.once.bottommost_level_compaction=force // optional, default force -/// ``` -/// -/// Disable manual compaction: -/// ``` -/// manual_compact.disabled=false // optional, default false -/// ``` -const std::string MANUAL_COMPACT_KEY_PREFIX("manual_compact."); -const std::string MANUAL_COMPACT_DISABLED_KEY(MANUAL_COMPACT_KEY_PREFIX + "disabled"); -const std::string MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT_KEY(MANUAL_COMPACT_KEY_PREFIX + - "max_concurrent_running_count"); - -const std::string MANUAL_COMPACT_PERIODIC_KEY_PREFIX(MANUAL_COMPACT_KEY_PREFIX + "periodic."); -const std::string MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY(MANUAL_COMPACT_PERIODIC_KEY_PREFIX + - "trigger_time"); - -const std::string MANUAL_COMPACT_ONCE_KEY_PREFIX(MANUAL_COMPACT_KEY_PREFIX + "once."); -const std::string MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY(MANUAL_COMPACT_ONCE_KEY_PREFIX + - "trigger_time"); - -// see more about the following two keys in rocksdb::CompactRangeOptions -const std::string MANUAL_COMPACT_TARGET_LEVEL_KEY("target_level"); - -const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY("bottommost_level_compaction"); -const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE("force"); -const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP("skip"); - -/// default ttl for items in a table. If ttl is not set for -/// * a new written item, 'default_ttl' will be applied on this item. -/// * an exist item, 'default_ttl' will be applied on this item when it was compacted. -/// <= 0 means no effect -const std::string TABLE_LEVEL_DEFAULT_TTL("default_ttl"); - -const std::string ROCKDB_CHECKPOINT_RESERVE_MIN_COUNT("rocksdb.checkpoint.reserve_min_count"); -const std::string ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS("rocksdb.checkpoint.reserve_time_seconds"); - -/// read cluster meta address from this section -const std::string PEGASUS_CLUSTER_SECTION_NAME("pegasus.clusters"); - -/// table level slow query -const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD("replica.slow_query_threshold"); - -/// enable or disable block cache of app -const std::string ROCKSDB_BLOCK_CACHE_ENABLED("replica.rocksdb_block_cache_enabled"); - -/// time threshold of each rocksdb iteration -const std::string - ROCKSDB_ITERATION_THRESHOLD_TIME_MS("replica.rocksdb_iteration_threshold_time_ms"); - -/// true means compaction and scan will validate partition_hash, otherwise false -const std::string SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partition_hash"); - -/// json string which represents user specified compaction -const std::string USER_SPECIFIED_COMPACTION("user_specified_compaction"); - -const std::string READ_SIZE_THROTTLING("replica.read_throttling_by_size"); - -const std::string ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_ingest_behind"); - -const std::string ROCKSDB_WRITE_BUFFER_SIZE("rocksdb.write_buffer_size"); - -const std::string ROCKSDB_NUM_LEVELS("rocksdb.num_levels"); - -const std::set ROCKSDB_DYNAMIC_OPTIONS = { - ROCKSDB_WRITE_BUFFER_SIZE, -}; -const std::set ROCKSDB_STATIC_OPTIONS = { - ROCKSDB_NUM_LEVELS, -}; - -const std::unordered_map cf_opts_setters = { - {ROCKSDB_WRITE_BUFFER_SIZE, - [](const std::string &str, rocksdb::ColumnFamilyOptions &option) -> bool { - uint64_t val = 0; - if (!dsn::buf2uint64(str, val)) { - return false; - } - option.write_buffer_size = static_cast(val); - return true; - }}, - {ROCKSDB_NUM_LEVELS, - [](const std::string &str, rocksdb::ColumnFamilyOptions &option) -> bool { - int32_t val = 0; - if (!dsn::buf2int32(str, val)) { - return false; - } - option.num_levels = val; - return true; - }}, -}; - -const std::unordered_map cf_opts_getters = { - {ROCKSDB_WRITE_BUFFER_SIZE, - [](const rocksdb::ColumnFamilyOptions &option, /*out*/ std::string &str) { - str = std::to_string(option.write_buffer_size); - }}, - {ROCKSDB_NUM_LEVELS, - [](const rocksdb::ColumnFamilyOptions &option, /*out*/ std::string &str) { - str = std::to_string(option.num_levels); - }}, -}; -} // namespace pegasus diff --git a/src/base/pegasus_const.h b/src/base/pegasus_const.h deleted file mode 100644 index e9326dfaa9..0000000000 --- a/src/base/pegasus_const.h +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once - -#include -#include -#include -#include - -namespace rocksdb { -struct ColumnFamilyOptions; -} // namespace rocksdb - -namespace pegasus { - -const int SCAN_CONTEXT_ID_VALID_MIN = 0; -const int SCAN_CONTEXT_ID_COMPLETED = -1; -const int SCAN_CONTEXT_ID_NOT_EXIST = -2; - -extern const std::string ROCKSDB_ENV_RESTORE_FORCE_RESTORE; -extern const std::string ROCKSDB_ENV_RESTORE_POLICY_NAME; -extern const std::string ROCKSDB_ENV_RESTORE_BACKUP_ID; - -extern const std::string ROCKSDB_ENV_USAGE_SCENARIO_KEY; -extern const std::string ROCKSDB_ENV_USAGE_SCENARIO_NORMAL; -extern const std::string ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE; -extern const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD; - -extern const std::string MANUAL_COMPACT_KEY_PREFIX; -extern const std::string MANUAL_COMPACT_DISABLED_KEY; -extern const std::string MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT_KEY; - -extern const std::string MANUAL_COMPACT_PERIODIC_KEY_PREFIX; -extern const std::string MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY; - -extern const std::string MANUAL_COMPACT_ONCE_KEY_PREFIX; -extern const std::string MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY; - -extern const std::string MANUAL_COMPACT_TARGET_LEVEL_KEY; - -extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY; -extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE; -extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP; - -extern const std::string TABLE_LEVEL_DEFAULT_TTL; - -extern const std::string ROCKDB_CHECKPOINT_RESERVE_MIN_COUNT; -extern const std::string ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS; - -extern const std::string PEGASUS_CLUSTER_SECTION_NAME; - -extern const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD; - -extern const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS; - -extern const std::string ROCKSDB_BLOCK_CACHE_ENABLED; - -extern const std::string SPLIT_VALIDATE_PARTITION_HASH; - -extern const std::string USER_SPECIFIED_COMPACTION; - -extern const std::string READ_SIZE_THROTTLING; - -extern const std::string ROCKSDB_ALLOW_INGEST_BEHIND; - -extern const std::string ROCKSDB_WRITE_BUFFER_SIZE; - -extern const std::string ROCKSDB_NUM_LEVELS; - -extern const std::set ROCKSDB_DYNAMIC_OPTIONS; - -extern const std::set ROCKSDB_STATIC_OPTIONS; - -using cf_opts_setter = std::function; -extern const std::unordered_map cf_opts_setters; - -using cf_opts_getter = - std::function; -extern const std::unordered_map cf_opts_getters; -} // namespace pegasus diff --git a/src/client_lib/pegasus_client_impl.cpp b/src/client_lib/pegasus_client_impl.cpp index 177477beb4..b3e30f5e4c 100644 --- a/src/client_lib/pegasus_client_impl.cpp +++ b/src/client_lib/pegasus_client_impl.cpp @@ -26,7 +26,8 @@ #include #include -#include "base/pegasus_const.h" +#include "absl/strings/string_view.h" +#include "common/common.h" #include "common/replication_other_types.h" #include "common/serialization_helper/dsn.layer2_types.h" #include "pegasus/client.h" @@ -40,10 +41,8 @@ #include "runtime/task/task_code.h" #include "utils/error_code.h" #include "utils/fmt_logging.h" -#include "absl/strings/string_view.h" #include "utils/synchronize.h" #include "utils/threadpool_code.h" -#include "utils/utils.h" namespace dsn { class message_ex; @@ -65,7 +64,7 @@ pegasus_client_impl::pegasus_client_impl(const char *cluster_name, const char *a { std::vector meta_servers; dsn::replication::replica_helper::load_meta_servers( - meta_servers, PEGASUS_CLUSTER_SECTION_NAME.c_str(), cluster_name); + meta_servers, dsn::PEGASUS_CLUSTER_SECTION_NAME.c_str(), cluster_name); CHECK_GT(meta_servers.size(), 0); _meta_server.assign_group("meta-servers"); _meta_server.group_address()->add_list(meta_servers); diff --git a/src/client_lib/pegasus_scanner_impl.cpp b/src/client_lib/pegasus_scanner_impl.cpp index d161a541f2..51282e14d3 100644 --- a/src/client_lib/pegasus_scanner_impl.cpp +++ b/src/client_lib/pegasus_scanner_impl.cpp @@ -27,7 +27,6 @@ #include #include -#include "base/pegasus_const.h" #include "common/gpid.h" #include "pegasus/client.h" #include "pegasus/error.h" @@ -52,6 +51,13 @@ using namespace pegasus; namespace pegasus { namespace client { +// TODO(yingchun): There are duplicate variables in src/server/pegasus_scan_context.h, +// because this is in client library, it's better to avoid including too many headers. +// We can move it to thrift which would be included by both server and client. +static const int SCAN_CONTEXT_ID_VALID_MIN = 0; +static const int SCAN_CONTEXT_ID_COMPLETED = -1; +static const int SCAN_CONTEXT_ID_NOT_EXIST = -2; + pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrdb_client *client, std::vector &&hash, const scan_options &options, diff --git a/src/common/common.cpp b/src/common/common.cpp index 5df3f73e61..f32a86aefc 100644 --- a/src/common/common.cpp +++ b/src/common/common.cpp @@ -29,4 +29,6 @@ DSN_DEFINE_string(replication, cluster_name, "", "name of this cluster"); CHECK(!utils::is_empty(FLAGS_cluster_name), "cluster_name is not set"); return FLAGS_cluster_name; } + +const std::string PEGASUS_CLUSTER_SECTION_NAME("pegasus.clusters"); } // namespace dsn diff --git a/src/common/common.h b/src/common/common.h index 920c333249..6ef992c6f1 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -19,10 +19,14 @@ #pragma once +#include + namespace dsn { /// Returns the cluster name (i.e, "onebox") if it's configured under /// "replication" section: /// [replication] /// cluster_name = "onebox" extern const char *get_current_cluster_name(); + +extern const std::string PEGASUS_CLUSTER_SECTION_NAME; } // namespace dsn diff --git a/src/common/replica_envs.cpp b/src/common/replica_envs.cpp new file mode 100644 index 0000000000..f1b85b0f23 --- /dev/null +++ b/src/common/replica_envs.cpp @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "common/replica_envs.h" + +#include + +namespace dsn { +const uint64_t replica_envs::MIN_SLOW_QUERY_THRESHOLD_MS = 20; +const std::string replica_envs::DENY_CLIENT_REQUEST("replica.deny_client_request"); +const std::string replica_envs::WRITE_QPS_THROTTLING("replica.write_throttling"); +const std::string replica_envs::WRITE_SIZE_THROTTLING("replica.write_throttling_by_size"); +const std::string replica_envs::SLOW_QUERY_THRESHOLD("replica.slow_query_threshold"); +const std::string replica_envs::ROCKSDB_USAGE_SCENARIO("rocksdb.usage_scenario"); +/// default ttl for items in a table. If ttl is not set for +/// * a new written item, 'default_ttl' will be applied on this item. +/// * an exist item, 'default_ttl' will be applied on this item when it was compacted. +/// <= 0 means no effect +const std::string replica_envs::TABLE_LEVEL_DEFAULT_TTL("default_ttl"); + +/// A task of manual compaction can be triggered by update of app environment variables as follows: +/// Periodic manual compaction: triggered every day at the given `trigger_time`. +/// ``` +/// manual_compact.periodic.trigger_time=3:00,21:00 // required +/// manual_compact.periodic.target_level=-1 // optional, default -1 +/// manual_compact.periodic.bottommost_level_compaction=force // optional, default force +/// ``` +/// +/// Executed-once manual compaction: Triggered only at the specified unix time. +/// ``` +/// manual_compact.once.trigger_time=1525930272 // required +/// manual_compact.once.target_level=-1 // optional, default -1 +/// manual_compact.once.bottommost_level_compaction=force // optional, default force +/// ``` +/// +/// Disable manual compaction: +/// ``` +/// manual_compact.disabled=false // optional, default false +/// ``` +const std::string MANUAL_COMPACT_PREFIX("manual_compact."); +const std::string replica_envs::MANUAL_COMPACT_DISABLED(MANUAL_COMPACT_PREFIX + "disabled"); +const std::string replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT( + MANUAL_COMPACT_PREFIX + "max_concurrent_running_count"); +const std::string replica_envs::MANUAL_COMPACT_ONCE_PREFIX(MANUAL_COMPACT_PREFIX + "once."); +const std::string replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME(MANUAL_COMPACT_ONCE_PREFIX + + "trigger_time"); +// see more about the following two keys in rocksdb::CompactRangeOptions +const std::string replica_envs::MANUAL_COMPACT_TARGET_LEVEL("target_level"); +const std::string replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL(MANUAL_COMPACT_ONCE_PREFIX + + "target_level"); +const std::string + replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION("bottommost_level_compaction"); +const std::string replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION( + MANUAL_COMPACT_ONCE_PREFIX + "bottommost_level_compaction"); +const std::string replica_envs::MANUAL_COMPACT_PERIODIC_PREFIX(MANUAL_COMPACT_PREFIX + "periodic."); +const std::string replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME( + MANUAL_COMPACT_PERIODIC_PREFIX + "trigger_time"); +const std::string replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL( + MANUAL_COMPACT_PERIODIC_PREFIX + "target_level"); +const std::string replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION( + MANUAL_COMPACT_PERIODIC_PREFIX + "bottommost_level_compaction"); +const std::string + replica_envs::ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT("rocksdb.checkpoint.reserve_min_count"); +const std::string replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS( + "rocksdb.checkpoint.reserve_time_seconds"); + +/// time threshold of each rocksdb iteration +const std::string replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS( + "replica.rocksdb_iteration_threshold_time_ms"); +const std::string replica_envs::ROCKSDB_BLOCK_CACHE_ENABLED("replica.rocksdb_block_cache_enabled"); +const std::string replica_envs::BUSINESS_INFO("business.info"); +const std::string replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS( + "replica_access_controller.allowed_users"); +const std::string replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES( + "replica_access_controller.ranger_policies"); +const std::string replica_envs::READ_QPS_THROTTLING("replica.read_throttling"); +const std::string replica_envs::READ_SIZE_THROTTLING("replica.read_throttling_by_size"); + +/// true means compaction and scan will validate partition_hash, otherwise false +const std::string + replica_envs::SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partition_hash"); + +/// json string which represents user specified compaction +const std::string replica_envs::USER_SPECIFIED_COMPACTION("user_specified_compaction"); +const std::string replica_envs::BACKUP_REQUEST_QPS_THROTTLING("replica.backup_request_throttling"); +const std::string replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_ingest_behind"); +const std::string replica_envs::UPDATE_MAX_REPLICA_COUNT("max_replica_count.update"); +const std::string replica_envs::ROCKSDB_WRITE_BUFFER_SIZE("rocksdb.write_buffer_size"); +const std::string replica_envs::ROCKSDB_NUM_LEVELS("rocksdb.num_levels"); + +const std::set replica_envs::ROCKSDB_DYNAMIC_OPTIONS = { + replica_envs::ROCKSDB_WRITE_BUFFER_SIZE, +}; +const std::set replica_envs::ROCKSDB_STATIC_OPTIONS = { + replica_envs::ROCKSDB_NUM_LEVELS, +}; +} // namespace dsn diff --git a/src/common/replica_envs.h b/src/common/replica_envs.h index 1db2e5399f..e808363976 100644 --- a/src/common/replica_envs.h +++ b/src/common/replica_envs.h @@ -31,15 +31,15 @@ #include namespace dsn { -namespace replication { class replica_envs { public: + static const uint64_t MIN_SLOW_QUERY_THRESHOLD_MS; + static const std::string DENY_CLIENT_REQUEST; static const std::string WRITE_QPS_THROTTLING; static const std::string WRITE_SIZE_THROTTLING; - static const uint64_t MIN_SLOW_QUERY_THRESHOLD_MS; static const std::string SLOW_QUERY_THRESHOLD; static const std::string TABLE_LEVEL_DEFAULT_TTL; static const std::string ROCKSDB_USAGE_SCENARIO; @@ -47,10 +47,14 @@ class replica_envs static const std::string ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS; static const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS; static const std::string ROCKSDB_BLOCK_CACHE_ENABLED; + static const std::string MANUAL_COMPACT_ONCE_PREFIX; + static const std::string MANUAL_COMPACT_PERIODIC_PREFIX; static const std::string MANUAL_COMPACT_DISABLED; + static const std::string MANUAL_COMPACT_TARGET_LEVEL; static const std::string MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT; static const std::string MANUAL_COMPACT_ONCE_TRIGGER_TIME; static const std::string MANUAL_COMPACT_ONCE_TARGET_LEVEL; + static const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION; static const std::string MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION; static const std::string MANUAL_COMPACT_PERIODIC_TRIGGER_TIME; static const std::string MANUAL_COMPACT_PERIODIC_TARGET_LEVEL; @@ -72,5 +76,4 @@ class replica_envs static const std::set ROCKSDB_STATIC_OPTIONS; }; -} // namespace replication } // namespace dsn diff --git a/src/common/replication_common.cpp b/src/common/replication_common.cpp index 1585aefe54..3b0a7f00a5 100644 --- a/src/common/replication_common.cpp +++ b/src/common/replication_common.cpp @@ -31,10 +31,8 @@ #include #include #include -#include #include "common/gpid.h" -#include "common/replica_envs.h" #include "common/replication_other_types.h" #include "dsn.layer2_types.h" #include "fmt/core.h" @@ -347,59 +345,5 @@ replication_options::check_if_in_black_list(const std::vector &blac return false; } -const std::string replica_envs::DENY_CLIENT_REQUEST("replica.deny_client_request"); -const std::string replica_envs::WRITE_QPS_THROTTLING("replica.write_throttling"); -const std::string replica_envs::WRITE_SIZE_THROTTLING("replica.write_throttling_by_size"); -const uint64_t replica_envs::MIN_SLOW_QUERY_THRESHOLD_MS = 20; -const std::string replica_envs::SLOW_QUERY_THRESHOLD("replica.slow_query_threshold"); -const std::string replica_envs::ROCKSDB_USAGE_SCENARIO("rocksdb.usage_scenario"); -const std::string replica_envs::TABLE_LEVEL_DEFAULT_TTL("default_ttl"); -const std::string MANUAL_COMPACT_PREFIX("manual_compact."); -const std::string replica_envs::MANUAL_COMPACT_DISABLED(MANUAL_COMPACT_PREFIX + "disabled"); -const std::string replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT( - MANUAL_COMPACT_PREFIX + "max_concurrent_running_count"); -const std::string MANUAL_COMPACT_ONCE_PREFIX(MANUAL_COMPACT_PREFIX + "once."); -const std::string replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME(MANUAL_COMPACT_ONCE_PREFIX + - "trigger_time"); -const std::string replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL(MANUAL_COMPACT_ONCE_PREFIX + - "target_level"); -const std::string replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION( - MANUAL_COMPACT_ONCE_PREFIX + "bottommost_level_compaction"); -const std::string MANUAL_COMPACT_PERIODIC_PREFIX(MANUAL_COMPACT_PREFIX + "periodic."); -const std::string replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME( - MANUAL_COMPACT_PERIODIC_PREFIX + "trigger_time"); -const std::string replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL( - MANUAL_COMPACT_PERIODIC_PREFIX + "target_level"); -const std::string replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION( - MANUAL_COMPACT_PERIODIC_PREFIX + "bottommost_level_compaction"); -const std::string - replica_envs::ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT("rocksdb.checkpoint.reserve_min_count"); -const std::string replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS( - "rocksdb.checkpoint.reserve_time_seconds"); -const std::string replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS( - "replica.rocksdb_iteration_threshold_time_ms"); -const std::string replica_envs::ROCKSDB_BLOCK_CACHE_ENABLED("replica.rocksdb_block_cache_enabled"); -const std::string replica_envs::BUSINESS_INFO("business.info"); -const std::string replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS( - "replica_access_controller.allowed_users"); -const std::string replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES( - "replica_access_controller.ranger_policies"); -const std::string replica_envs::READ_QPS_THROTTLING("replica.read_throttling"); -const std::string replica_envs::READ_SIZE_THROTTLING("replica.read_throttling_by_size"); -const std::string - replica_envs::SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partition_hash"); -const std::string replica_envs::USER_SPECIFIED_COMPACTION("user_specified_compaction"); -const std::string replica_envs::BACKUP_REQUEST_QPS_THROTTLING("replica.backup_request_throttling"); -const std::string replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_ingest_behind"); -const std::string replica_envs::UPDATE_MAX_REPLICA_COUNT("max_replica_count.update"); -const std::string replica_envs::ROCKSDB_WRITE_BUFFER_SIZE("rocksdb.write_buffer_size"); -const std::string replica_envs::ROCKSDB_NUM_LEVELS("rocksdb.num_levels"); - -const std::set replica_envs::ROCKSDB_DYNAMIC_OPTIONS = { - replica_envs::ROCKSDB_WRITE_BUFFER_SIZE, -}; -const std::set replica_envs::ROCKSDB_STATIC_OPTIONS = { - replica_envs::ROCKSDB_NUM_LEVELS, -}; } // namespace replication } // namespace dsn diff --git a/src/common/test/common_test.cpp b/src/common/test/common_test.cpp index 2bc913fada..f645bbe705 100644 --- a/src/common/test/common_test.cpp +++ b/src/common/test/common_test.cpp @@ -19,8 +19,6 @@ #include "common/common.h" -#include - #include "gtest/gtest.h" namespace dsn { diff --git a/src/geo/test/geo_test.cpp b/src/geo/test/geo_test.cpp index 69074ac886..aacd668e14 100644 --- a/src/geo/test/geo_test.cpp +++ b/src/geo/test/geo_test.cpp @@ -34,8 +34,8 @@ #include #include -#include "base/pegasus_const.h" #include "client/replication_ddl_client.h" +#include "common/common.h" #include "common/replication_other_types.h" #include "geo/lib/geo_client.h" #include "gtest/gtest.h" @@ -63,7 +63,7 @@ class geo_client_test : public ::testing::Test { std::vector meta_list; bool ok = dsn::replication::replica_helper::load_meta_servers( - meta_list, PEGASUS_CLUSTER_SECTION_NAME.c_str(), "onebox"); + meta_list, dsn::PEGASUS_CLUSTER_SECTION_NAME.c_str(), "onebox"); CHECK(ok, "load_meta_servers failed"); auto ddl_client = new dsn::replication::replication_ddl_client(meta_list); dsn::error_code error = ddl_client->create_app("temp_geo", "pegasus", 4, 3, {}, false); diff --git a/src/ranger/ranger_resource_policy_manager.cpp b/src/ranger/ranger_resource_policy_manager.cpp index d9fb6c30dd..fe719a8987 100644 --- a/src/ranger/ranger_resource_policy_manager.cpp +++ b/src/ranger/ranger_resource_policy_manager.cpp @@ -582,8 +582,7 @@ dsn::error_code ranger_resource_policy_manager::sync_policies_to_app_envs() auto req = std::make_unique(); req->__set_app_name(app.app_name); - req->__set_keys( - {dsn::replication::replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES}); + req->__set_keys({dsn::replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES}); std::vector matched_database_table_policies; for (const auto &policy : table_policies->second) { // If this table does not match any database, this policy will be skipped and will not diff --git a/src/redis_protocol/proxy_lib/redis_parser.cpp b/src/redis_protocol/proxy_lib/redis_parser.cpp index baf9aed8b0..c0cf0bff09 100644 --- a/src/redis_protocol/proxy_lib/redis_parser.cpp +++ b/src/redis_protocol/proxy_lib/redis_parser.cpp @@ -32,7 +32,7 @@ #include #include -#include "base/pegasus_const.h" +#include "common/common.h" #include "common/replication_other_types.h" #include "pegasus/client.h" #include "rrdb/rrdb_types.h" @@ -98,7 +98,7 @@ redis_parser::redis_parser(proxy_stub *op, dsn::message_ex *first_msg) if (op) { std::vector meta_list; dsn::replication::replica_helper::load_meta_servers( - meta_list, PEGASUS_CLUSTER_SECTION_NAME.c_str(), op->get_cluster()); + meta_list, dsn::PEGASUS_CLUSTER_SECTION_NAME.c_str(), op->get_cluster()); r = new ::dsn::apps::rrdb_client(op->get_cluster(), meta_list, op->get_app()); if (!dsn::utils::is_empty(op->get_geo_app())) { _geo_client = std::make_unique( diff --git a/src/server/meta_store.cpp b/src/server/meta_store.cpp index 3ba5187a07..db0b9a0a47 100644 --- a/src/server/meta_store.cpp +++ b/src/server/meta_store.cpp @@ -22,7 +22,7 @@ #include #include -#include "pegasus_const.h" +#include "common/replica_envs.h" #include "server/pegasus_server_impl.h" #include "utils/fmt_logging.h" #include "utils/string_conv.h" @@ -34,6 +34,9 @@ const std::string meta_store::DATA_VERSION = "pegasus_data_version"; const std::string meta_store::LAST_FLUSHED_DECREE = "pegasus_last_flushed_decree"; const std::string meta_store::LAST_MANUAL_COMPACT_FINISH_TIME = "pegasus_last_manual_compact_finish_time"; +const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL = "normal"; +const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE = "prefer_write"; +const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD = "bulk_load"; meta_store::meta_store(pegasus_server_impl *server, rocksdb::DB *db, @@ -83,11 +86,12 @@ std::string meta_store::get_usage_scenario() const { // If couldn't find rocksdb usage scenario in meta column family, return normal in default. std::string usage_scenario = ROCKSDB_ENV_USAGE_SCENARIO_NORMAL; - auto ec = get_string_value_from_meta_cf(false, ROCKSDB_ENV_USAGE_SCENARIO_KEY, &usage_scenario); + auto ec = get_string_value_from_meta_cf( + false, dsn::replica_envs::ROCKSDB_USAGE_SCENARIO, &usage_scenario); CHECK_PREFIX_MSG(ec == ::dsn::ERR_OK || ec == ::dsn::ERR_OBJECT_NOT_FOUND, "rocksdb {} get {} from meta column family failed: {}", _db->GetName(), - ROCKSDB_ENV_USAGE_SCENARIO_KEY, + dsn::replica_envs::ROCKSDB_USAGE_SCENARIO, ec); return usage_scenario; } @@ -185,8 +189,9 @@ void meta_store::set_last_manual_compact_finish_time(uint64_t last_manual_compac void meta_store::set_usage_scenario(const std::string &usage_scenario) const { - CHECK_EQ_PREFIX(::dsn::ERR_OK, - set_string_value_to_meta_cf(ROCKSDB_ENV_USAGE_SCENARIO_KEY, usage_scenario)); + CHECK_EQ_PREFIX( + ::dsn::ERR_OK, + set_string_value_to_meta_cf(dsn::replica_envs::ROCKSDB_USAGE_SCENARIO, usage_scenario)); } } // namespace server diff --git a/src/server/meta_store.h b/src/server/meta_store.h index 247702657e..d8744a1ccd 100644 --- a/src/server/meta_store.h +++ b/src/server/meta_store.h @@ -19,6 +19,7 @@ #pragma once +#include #include #include #include @@ -78,13 +79,19 @@ class meta_store : public dsn::replication::replica_base const std::string &key, std::string *value); + friend class pegasus_server_impl; friend class pegasus_write_service; friend class rocksdb_wrapper; + FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_latest_options); + FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_app_envs); // Keys of meta data wrote into meta column family. static const std::string DATA_VERSION; static const std::string LAST_FLUSHED_DECREE; static const std::string LAST_MANUAL_COMPACT_FINISH_TIME; + static const std::string ROCKSDB_ENV_USAGE_SCENARIO_NORMAL; + static const std::string ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE; + static const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD; rocksdb::DB *_db; rocksdb::ColumnFamilyHandle *_meta_cf; diff --git a/src/server/pegasus_manual_compact_service.cpp b/src/server/pegasus_manual_compact_service.cpp index e71f90e579..131c7e2e58 100644 --- a/src/server/pegasus_manual_compact_service.cpp +++ b/src/server/pegasus_manual_compact_service.cpp @@ -27,8 +27,8 @@ #include #include -#include "base/pegasus_const.h" #include "common/replication.codes.h" +#include "common/replica_envs.h" #include "pegasus_server_impl.h" #include "runtime/api_layer1.h" #include "runtime/task/async_calls.h" @@ -61,6 +61,11 @@ DSN_DEFINE_int32(pegasus.server, "minimal interval time in seconds to start a new manual compaction, <= 0 " "means no interval limit"); +const std::string + pegasus_manual_compact_service::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE("force"); +const std::string + pegasus_manual_compact_service::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP("skip"); + pegasus_manual_compact_service::pegasus_manual_compact_service(pegasus_server_impl *app) : replica_base(*app), _app(app), @@ -95,11 +100,11 @@ void pegasus_manual_compact_service::start_manual_compact_if_needed( std::string compact_rule; if (check_once_compact(envs)) { - compact_rule = MANUAL_COMPACT_ONCE_KEY_PREFIX; + compact_rule = dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX; } if (compact_rule.empty() && check_periodic_compact(envs)) { - compact_rule = MANUAL_COMPACT_PERIODIC_KEY_PREFIX; + compact_rule = dsn::replica_envs::MANUAL_COMPACT_PERIODIC_PREFIX; } if (compact_rule.empty()) { @@ -124,7 +129,7 @@ bool pegasus_manual_compact_service::check_compact_disabled( const std::map &envs) { bool new_disabled = false; - auto find = envs.find(MANUAL_COMPACT_DISABLED_KEY); + auto find = envs.find(dsn::replica_envs::MANUAL_COMPACT_DISABLED); if (find != envs.end() && find->second == "true") { new_disabled = true; } @@ -148,7 +153,7 @@ int pegasus_manual_compact_service::check_compact_max_concurrent_running_count( const std::map &envs) { int new_count = INT_MAX; - auto find = envs.find(MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT_KEY); + auto find = envs.find(dsn::replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT); if (find != envs.end() && !dsn::buf2int32(find->second, new_count)) { LOG_ERROR_PREFIX("{}={} is invalid.", find->first, find->second); } @@ -166,7 +171,7 @@ int pegasus_manual_compact_service::check_compact_max_concurrent_running_count( bool pegasus_manual_compact_service::check_once_compact( const std::map &envs) { - auto find = envs.find(MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY); + auto find = envs.find(dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME); if (find == envs.end()) { return false; } @@ -183,7 +188,7 @@ bool pegasus_manual_compact_service::check_once_compact( bool pegasus_manual_compact_service::check_periodic_compact( const std::map &envs) { - auto find = envs.find(MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY); + auto find = envs.find(dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME); if (find == envs.end()) { return false; } @@ -236,7 +241,7 @@ void pegasus_manual_compact_service::extract_manual_compact_opts( options.exclusive_manual_compaction = true; options.change_level = true; options.target_level = -1; - auto find = envs.find(key_prefix + MANUAL_COMPACT_TARGET_LEVEL_KEY); + auto find = envs.find(key_prefix + dsn::replica_envs::MANUAL_COMPACT_TARGET_LEVEL); if (find != envs.end()) { int32_t target_level; if (dsn::buf2int32(find->second, target_level) && @@ -252,7 +257,7 @@ void pegasus_manual_compact_service::extract_manual_compact_opts( } options.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kSkip; - find = envs.find(key_prefix + MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY); + find = envs.find(key_prefix + dsn::replica_envs::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION); if (find != envs.end()) { const std::string &argv = find->second; if (argv == MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE) { diff --git a/src/server/pegasus_manual_compact_service.h b/src/server/pegasus_manual_compact_service.h index d57343de27..329b10b7bf 100644 --- a/src/server/pegasus_manual_compact_service.h +++ b/src/server/pegasus_manual_compact_service.h @@ -19,6 +19,7 @@ #pragma once +#include #include #include #include @@ -84,6 +85,11 @@ class pegasus_manual_compact_service : public dsn::replication::replica_base uint64_t now_timestamp(); private: + FRIEND_TEST(manual_compact_service_test, extract_manual_compact_opts); + + static const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE; + static const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP; + pegasus_server_impl *_app; #ifdef PEGASUS_UNIT_TEST uint64_t _mock_now_timestamp = 0; diff --git a/src/server/pegasus_scan_context.h b/src/server/pegasus_scan_context.h index 7c256df0d2..ebf4399080 100644 --- a/src/server/pegasus_scan_context.h +++ b/src/server/pegasus_scan_context.h @@ -25,7 +25,6 @@ #include "utils/rand.h" #include -#include "base/pegasus_const.h" #include "base/pegasus_utils.h" namespace pegasus { @@ -71,6 +70,10 @@ struct pegasus_scan_context std::string _sort_key_filter_pattern_holder; public: + static const int SCAN_CONTEXT_ID_VALID_MIN = 0; + static const int SCAN_CONTEXT_ID_COMPLETED = -1; + static const int SCAN_CONTEXT_ID_NOT_EXIST = -2; + std::unique_ptr iterator; rocksdb::Slice stop; bool stop_inclusive; diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 3fd549f81d..f4a95e3a61 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -50,13 +50,13 @@ #include "base/pegasus_utils.h" #include "base/pegasus_value_schema.h" #include "capacity_unit_calculator.h" +#include "common/replica_envs.h" #include "common/replication.codes.h" #include "common/replication_enums.h" #include "consensus_types.h" #include "dsn.layer2_types.h" #include "hotkey_collector.h" #include "meta_store.h" -#include "pegasus_const.h" #include "pegasus_rpc_types.h" #include "pegasus_server_write.h" #include "replica_admin_types.h" @@ -146,6 +146,47 @@ const std::string pegasus_server_impl::DATA_COLUMN_FAMILY_NAME = "default"; const std::string pegasus_server_impl::META_COLUMN_FAMILY_NAME = "pegasus_meta_cf"; const std::chrono::seconds pegasus_server_impl::kServerStatUpdateTimeSec = std::chrono::seconds(10); +// should be same with items in dsn::backup_restore_constant +const std::string ROCKSDB_ENV_RESTORE_FORCE_RESTORE("restore.force_restore"); +const std::string ROCKSDB_ENV_RESTORE_POLICY_NAME("restore.policy_name"); +const std::string ROCKSDB_ENV_RESTORE_BACKUP_ID("restore.backup_id"); +const std::string ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS("rocksdb.checkpoint.reserve_time_seconds"); + +using cf_opts_setter = std::function; +const std::unordered_map cf_opts_setters = { + {dsn::replica_envs::ROCKSDB_WRITE_BUFFER_SIZE, + [](const std::string &str, rocksdb::ColumnFamilyOptions &option) -> bool { + uint64_t val = 0; + if (!dsn::buf2uint64(str, val)) { + return false; + } + option.write_buffer_size = static_cast(val); + return true; + }}, + {dsn::replica_envs::ROCKSDB_NUM_LEVELS, + [](const std::string &str, rocksdb::ColumnFamilyOptions &option) -> bool { + int32_t val = 0; + if (!dsn::buf2int32(str, val)) { + return false; + } + option.num_levels = val; + return true; + }}, +}; + +using cf_opts_getter = + std::function; +const std::unordered_map cf_opts_getters = { + {dsn::replica_envs::ROCKSDB_WRITE_BUFFER_SIZE, + [](const rocksdb::ColumnFamilyOptions &option, /*out*/ std::string &str) { + str = std::to_string(option.write_buffer_size); + }}, + {dsn::replica_envs::ROCKSDB_NUM_LEVELS, + [](const rocksdb::ColumnFamilyOptions &option, /*out*/ std::string &str) { + str = std::to_string(option.num_levels); + }}, +}; + void pegasus_server_impl::parse_checkpoints() { std::vector dirs; @@ -1341,7 +1382,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) std::chrono::minutes(5)); } else { // scan completed - resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED; + resp.context_id = pegasus_scan_context::SCAN_CONTEXT_ID_COMPLETED; } METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count); @@ -1486,7 +1527,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc) std::chrono::minutes(5)); } else { // scan completed - resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED; + resp.context_id = pegasus_scan_context::SCAN_CONTEXT_ID_COMPLETED; } METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count); @@ -2595,7 +2636,7 @@ void pegasus_server_impl::update_rocksdb_dynamic_options( } std::unordered_map new_options; - for (const auto &option : ROCKSDB_DYNAMIC_OPTIONS) { + for (const auto &option : dsn::replica_envs::ROCKSDB_DYNAMIC_OPTIONS) { const auto &find = envs.find(option); if (find == envs.end()) { continue; @@ -2621,7 +2662,7 @@ void pegasus_server_impl::set_rocksdb_options_before_creating( return; } - for (const auto &option : pegasus::ROCKSDB_STATIC_OPTIONS) { + for (const auto &option : dsn::replica_envs::ROCKSDB_STATIC_OPTIONS) { const auto &find = envs.find(option); if (find == envs.end()) { continue; @@ -2634,7 +2675,7 @@ void pegasus_server_impl::set_rocksdb_options_before_creating( } } - for (const auto &option : pegasus::ROCKSDB_DYNAMIC_OPTIONS) { + for (const auto &option : dsn::replica_envs::ROCKSDB_DYNAMIC_OPTIONS) { const auto &find = envs.find(option); if (find == envs.end()) { continue; @@ -2679,23 +2720,24 @@ void pegasus_server_impl::update_app_envs_before_open_db( void pegasus_server_impl::query_app_envs(/*out*/ std::map &envs) { - envs[ROCKSDB_ENV_USAGE_SCENARIO_KEY] = _usage_scenario; + envs[dsn::replica_envs::ROCKSDB_USAGE_SCENARIO] = _usage_scenario; // write_buffer_size involves random values (refer to pegasus_server_impl::set_usage_scenario), // so it can only be taken from _data_cf_opts - envs[ROCKSDB_WRITE_BUFFER_SIZE] = std::to_string(_data_cf_opts.write_buffer_size); + envs[dsn::replica_envs::ROCKSDB_WRITE_BUFFER_SIZE] = + std::to_string(_data_cf_opts.write_buffer_size); // Get Data ColumnFamilyOptions directly from _data_cf rocksdb::ColumnFamilyDescriptor desc; CHECK_TRUE(_data_cf->GetDescriptor(&desc).ok()); - for (const auto &option : pegasus::ROCKSDB_STATIC_OPTIONS) { + for (const auto &option : dsn::replica_envs::ROCKSDB_STATIC_OPTIONS) { auto getter = cf_opts_getters.find(option); CHECK_TRUE(getter != cf_opts_getters.end()); std::string option_val; getter->second(desc.options, option_val); envs[option] = option_val; } - for (const auto &option : pegasus::ROCKSDB_DYNAMIC_OPTIONS) { - if (option.compare(ROCKSDB_WRITE_BUFFER_SIZE) == 0) { + for (const auto &option : dsn::replica_envs::ROCKSDB_DYNAMIC_OPTIONS) { + if (option.compare(dsn::replica_envs::ROCKSDB_WRITE_BUFFER_SIZE) == 0) { continue; } auto getter = cf_opts_getters.find(option); @@ -2710,19 +2752,19 @@ void pegasus_server_impl::update_usage_scenario(const std::mapsecond : ROCKSDB_ENV_USAGE_SCENARIO_NORMAL); + (find != envs.end() ? find->second : meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL); if (new_usage_scenario != _usage_scenario) { std::string old_usage_scenario = _usage_scenario; if (set_usage_scenario(new_usage_scenario)) { LOG_INFO_PREFIX("update app env[{}] from \"{}\" to \"{}\" succeed", - ROCKSDB_ENV_USAGE_SCENARIO_KEY, + dsn::replica_envs::ROCKSDB_USAGE_SCENARIO, old_usage_scenario, new_usage_scenario); } else { LOG_ERROR_PREFIX("update app env[{}] from \"{}\" to \"{}\" failed", - ROCKSDB_ENV_USAGE_SCENARIO_KEY, + dsn::replica_envs::ROCKSDB_USAGE_SCENARIO, old_usage_scenario, new_usage_scenario); } @@ -2735,7 +2777,7 @@ void pegasus_server_impl::update_usage_scenario(const std::map &envs) { - auto find = envs.find(TABLE_LEVEL_DEFAULT_TTL); + auto find = envs.find(dsn::replica_envs::TABLE_LEVEL_DEFAULT_TTL); if (find != envs.end()) { int32_t ttl = 0; if (!dsn::buf2int32(find->second, ttl) || ttl < 0) { @@ -2753,7 +2795,7 @@ void pegasus_server_impl::update_checkpoint_reserve(const std::mapsecond, count) || count <= 0) { LOG_ERROR_PREFIX("{}={} is invalid.", find->first, find->second); @@ -2770,7 +2812,7 @@ void pegasus_server_impl::update_checkpoint_reserve(const std::mapparse_from_env(find->second, get_app_info()->partition_count, @@ -2798,7 +2840,7 @@ void pegasus_server_impl::update_throttling_controller( throttling_changed, old_throttling)) { LOG_WARNING_PREFIX("parse env failed, key = \"{}\", value = \"{}\", error = \"{}\"", - READ_SIZE_THROTTLING, + dsn::replica_envs::READ_SIZE_THROTTLING, find->second, parse_error); // reset if parse failed @@ -2810,7 +2852,7 @@ void pegasus_server_impl::update_throttling_controller( } if (throttling_changed) { LOG_INFO_PREFIX("switch {} from \"{}\" to \"{}\"", - READ_SIZE_THROTTLING, + dsn::replica_envs::READ_SIZE_THROTTLING, old_throttling, _read_size_throttling_controller->env_value()); } @@ -2820,7 +2862,7 @@ void pegasus_server_impl::update_slow_query_threshold( const std::map &envs) { uint64_t threshold_ns = FLAGS_rocksdb_slow_query_threshold_ns; - auto find = envs.find(ROCKSDB_ENV_SLOW_QUERY_THRESHOLD); + auto find = envs.find(dsn::replica_envs::SLOW_QUERY_THRESHOLD); if (find != envs.end()) { // get slow query from env(the unit of slow query from env is ms) uint64_t threshold_ms; @@ -2834,7 +2876,7 @@ void pegasus_server_impl::update_slow_query_threshold( // check if they are changed if (_slow_query_threshold_ns != threshold_ns) { LOG_INFO_PREFIX("update app env[{}] from \"{}\" to \"{}\" succeed", - ROCKSDB_ENV_SLOW_QUERY_THRESHOLD, + dsn::replica_envs::SLOW_QUERY_THRESHOLD, _slow_query_threshold_ns, threshold_ns); _slow_query_threshold_ns = threshold_ns; @@ -2845,7 +2887,7 @@ void pegasus_server_impl::update_rocksdb_iteration_threshold( const std::map &envs) { uint64_t threshold_ms = FLAGS_rocksdb_iteration_threshold_time_ms; - auto find = envs.find(ROCKSDB_ITERATION_THRESHOLD_TIME_MS); + auto find = envs.find(dsn::replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS); if (find != envs.end()) { // the unit of iteration threshold from env is ms if (!dsn::buf2uint64(find->second, threshold_ms) || threshold_ms < 0) { @@ -2856,7 +2898,7 @@ void pegasus_server_impl::update_rocksdb_iteration_threshold( if (_rng_rd_opts.rocksdb_iteration_threshold_time_ms != threshold_ms) { LOG_INFO_PREFIX("update app env[{}] from \"{}\" to \"{}\" succeed", - ROCKSDB_ITERATION_THRESHOLD_TIME_MS, + dsn::replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS, _rng_rd_opts.rocksdb_iteration_threshold_time_ms, threshold_ms); _rng_rd_opts.rocksdb_iteration_threshold_time_ms = threshold_ms; @@ -2868,7 +2910,7 @@ void pegasus_server_impl::update_rocksdb_block_cache_enabled( { // default of ReadOptions:fill_cache is true bool cache_enabled = true; - auto find = envs.find(ROCKSDB_BLOCK_CACHE_ENABLED); + auto find = envs.find(dsn::replica_envs::ROCKSDB_BLOCK_CACHE_ENABLED); if (find != envs.end()) { if (!dsn::buf2bool(find->second, cache_enabled)) { LOG_ERROR_PREFIX("{}={} is invalid.", find->first, find->second); @@ -2878,7 +2920,7 @@ void pegasus_server_impl::update_rocksdb_block_cache_enabled( if (_data_cf_rd_opts.fill_cache != cache_enabled) { LOG_INFO_PREFIX("update app env[{}] from \"{}\" to \"{}\" succeed", - ROCKSDB_BLOCK_CACHE_ENABLED, + dsn::replica_envs::ROCKSDB_BLOCK_CACHE_ENABLED, _data_cf_rd_opts.fill_cache, cache_enabled); _data_cf_rd_opts.fill_cache = cache_enabled; @@ -2889,7 +2931,7 @@ void pegasus_server_impl::update_validate_partition_hash( const std::map &envs) { bool new_value = false; - auto iter = envs.find(SPLIT_VALIDATE_PARTITION_HASH); + auto iter = envs.find(dsn::replica_envs::SPLIT_VALIDATE_PARTITION_HASH); if (iter != envs.end()) { if (!dsn::buf2bool(iter->second, new_value)) { LOG_ERROR_PREFIX("{}={} is invalid.", iter->first, iter->second); @@ -2907,7 +2949,7 @@ void pegasus_server_impl::update_validate_partition_hash( void pegasus_server_impl::update_user_specified_compaction( const std::map &envs) { - auto iter = envs.find(USER_SPECIFIED_COMPACTION); + auto iter = envs.find(dsn::replica_envs::USER_SPECIFIED_COMPACTION); if (dsn_unlikely(iter == envs.end() && _user_specified_compaction != "")) { LOG_INFO_PREFIX("clear user specified compaction coz it was deleted"); _key_ttl_compaction_filter_factory->clear_user_specified_ops(); @@ -2925,7 +2967,7 @@ void pegasus_server_impl::update_user_specified_compaction( bool pegasus_server_impl::parse_allow_ingest_behind(const std::map &envs) { bool allow_ingest_behind = false; - const auto &iter = envs.find(ROCKSDB_ALLOW_INGEST_BEHIND); + const auto &iter = envs.find(dsn::replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND); if (iter == envs.end()) { return allow_ingest_behind; } @@ -3022,9 +3064,9 @@ bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario) return false; std::string old_usage_scenario = _usage_scenario; std::unordered_map new_options; - if (usage_scenario == ROCKSDB_ENV_USAGE_SCENARIO_NORMAL || - usage_scenario == ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE) { - if (_usage_scenario == ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD) { + if (usage_scenario == meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL || + usage_scenario == meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE) { + if (_usage_scenario == meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD) { // old usage scenario is bulk load, reset first new_options["level0_file_num_compaction_trigger"] = std::to_string(_data_cf_opts.level0_file_num_compaction_trigger); @@ -3044,12 +3086,12 @@ bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario) std::to_string(_data_cf_opts.max_write_buffer_number); } - if (usage_scenario == ROCKSDB_ENV_USAGE_SCENARIO_NORMAL) { + if (usage_scenario == meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL) { new_options["write_buffer_size"] = std::to_string(get_random_nearby(_data_cf_opts.write_buffer_size)); new_options["level0_file_num_compaction_trigger"] = std::to_string(_data_cf_opts.level0_file_num_compaction_trigger); - } else { // ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE + } else { // meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE uint64_t buffer_size = dsn::rand::next_u64(_data_cf_opts.write_buffer_size, _data_cf_opts.write_buffer_size * 2); new_options["write_buffer_size"] = std::to_string(buffer_size); @@ -3057,7 +3099,7 @@ bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario) new_options["level0_file_num_compaction_trigger"] = std::to_string(std::max(4UL, max_size / buffer_size)); } - } else if (usage_scenario == ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD) { + } else if (usage_scenario == meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD) { // refer to Options::PrepareForBulkLoad() new_options["level0_file_num_compaction_trigger"] = "1000000000"; new_options["level0_slowdown_writes_trigger"] = "1000000000"; @@ -3177,9 +3219,9 @@ void pegasus_server_impl::recalculate_data_cf_options( return; } std::unordered_map new_options; - if (ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario || - ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE == _usage_scenario) { - if (ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario) { + if (meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario || + meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE == _usage_scenario) { + if (meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario) { UPDATE_OPTION_IF_NOT_NEARBY(write_buffer_size, _data_cf_opts.write_buffer_size); UPDATE_OPTION_IF_NEEDED(level0_file_num_compaction_trigger); } else { diff --git a/src/server/test/manual_compact_service_test.cpp b/src/server/test/manual_compact_service_test.cpp index a97bf957eb..9dc2acaea2 100644 --- a/src/server/test/manual_compact_service_test.cpp +++ b/src/server/test/manual_compact_service_test.cpp @@ -24,8 +24,8 @@ #include #include +#include "common/replica_envs.h" #include "gtest/gtest.h" -#include "pegasus_const.h" #include "pegasus_server_test_base.h" #include "runtime/api_layer1.h" #include "server/pegasus_manual_compact_service.h" @@ -115,22 +115,22 @@ TEST_P(manual_compact_service_test, check_compact_disabled) std::map envs; check_compact_disabled(envs, false); - envs[MANUAL_COMPACT_DISABLED_KEY] = ""; + envs[dsn::replica_envs::MANUAL_COMPACT_DISABLED] = ""; check_compact_disabled(envs, false); - envs[MANUAL_COMPACT_DISABLED_KEY] = "true"; + envs[dsn::replica_envs::MANUAL_COMPACT_DISABLED] = "true"; check_compact_disabled(envs, true); - envs[MANUAL_COMPACT_DISABLED_KEY] = "false"; + envs[dsn::replica_envs::MANUAL_COMPACT_DISABLED] = "false"; check_compact_disabled(envs, false); - envs[MANUAL_COMPACT_DISABLED_KEY] = "1"; + envs[dsn::replica_envs::MANUAL_COMPACT_DISABLED] = "1"; check_compact_disabled(envs, false); - envs[MANUAL_COMPACT_DISABLED_KEY] = "0"; + envs[dsn::replica_envs::MANUAL_COMPACT_DISABLED] = "0"; check_compact_disabled(envs, false); - envs[MANUAL_COMPACT_DISABLED_KEY] = "abc"; + envs[dsn::replica_envs::MANUAL_COMPACT_DISABLED] = "abc"; check_compact_disabled(envs, false); } @@ -143,27 +143,27 @@ TEST_P(manual_compact_service_test, check_once_compact) std::map envs; check_once_compact(envs, false); - envs[MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY] = ""; + envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME] = ""; check_once_compact(envs, false); - envs[MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY] = "abc"; + envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME] = "abc"; check_once_compact(envs, false); - envs[MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY] = "-1"; + envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME] = "-1"; check_once_compact(envs, false); // has been compacted - envs[MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY] = std::to_string(compacted_ts - 1); + envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME] = std::to_string(compacted_ts - 1); check_once_compact(envs, false); - envs[MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY] = std::to_string(compacted_ts); + envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME] = std::to_string(compacted_ts); check_once_compact(envs, false); // has not been compacted - envs[MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY] = std::to_string(compacted_ts + 1); + envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME] = std::to_string(compacted_ts + 1); check_once_compact(envs, true); - envs[MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY] = std::to_string(dsn_now_ms() / 1000); + envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME] = std::to_string(dsn_now_ms() / 1000); check_once_compact(envs, true); } @@ -174,36 +174,36 @@ TEST_P(manual_compact_service_test, check_periodic_compact) // invalid trigger time format check_periodic_compact(envs, false); - envs[MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY] = ""; + envs[dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME] = ""; check_periodic_compact(envs, false); - envs[MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY] = ","; + envs[dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME] = ","; check_periodic_compact(envs, false); - envs[MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY] = "12:oo"; + envs[dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME] = "12:oo"; check_periodic_compact(envs, false); - envs[MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY] = std::to_string(compacted_ts); + envs[dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME] = std::to_string(compacted_ts); check_periodic_compact(envs, false); // suppose compacted at 10:00 set_compact_time(dsn::utils::hh_mm_today_to_unix_sec("10:00")); // has been compacted - envs[MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY] = "9:00"; + envs[dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME] = "9:00"; check_periodic_compact(envs, false); - envs[MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY] = "3:00,9:00"; + envs[dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME] = "3:00,9:00"; check_periodic_compact(envs, false); - envs[MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY] = "10:00"; + envs[dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME] = "10:00"; check_periodic_compact(envs, false); // suppose compacted at 09:00 set_compact_time(dsn::utils::hh_mm_today_to_unix_sec("09:00")); // single compact time - envs[MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY] = "10:00"; + envs[dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME] = "10:00"; set_mock_now((uint64_t)dsn::utils::hh_mm_today_to_unix_sec("08:00")); check_periodic_compact(envs, false); @@ -215,7 +215,7 @@ TEST_P(manual_compact_service_test, check_periodic_compact) check_periodic_compact(envs, true); // multiple compact time - envs[MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY] = "10:00,21:00"; + envs[dsn::replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME] = "10:00,21:00"; set_mock_now((uint64_t)dsn::utils::hh_mm_today_to_unix_sec("08:00")); check_periodic_compact(envs, false); @@ -253,33 +253,36 @@ TEST_P(manual_compact_service_test, extract_manual_compact_opts) std::map envs; rocksdb::CompactRangeOptions out; - extract_manual_compact_opts(envs, MANUAL_COMPACT_ONCE_KEY_PREFIX, out); + extract_manual_compact_opts(envs, dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX, out); ASSERT_EQ(out.target_level, -1); ASSERT_EQ(out.bottommost_level_compaction, rocksdb::BottommostLevelCompaction::kSkip); - envs[MANUAL_COMPACT_ONCE_KEY_PREFIX + MANUAL_COMPACT_TARGET_LEVEL_KEY] = "2"; - envs[MANUAL_COMPACT_ONCE_KEY_PREFIX + MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY] = - MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE; - extract_manual_compact_opts(envs, MANUAL_COMPACT_ONCE_KEY_PREFIX, out); + envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX + + dsn::replica_envs::MANUAL_COMPACT_TARGET_LEVEL] = "2"; + envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION] = + pegasus_manual_compact_service::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE; + extract_manual_compact_opts(envs, dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX, out); ASSERT_EQ(out.target_level, 2); ASSERT_EQ(out.bottommost_level_compaction, rocksdb::BottommostLevelCompaction::kForce); - envs[MANUAL_COMPACT_ONCE_KEY_PREFIX + MANUAL_COMPACT_TARGET_LEVEL_KEY] = "-1"; - envs[MANUAL_COMPACT_ONCE_KEY_PREFIX + MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY] = - MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP; - extract_manual_compact_opts(envs, MANUAL_COMPACT_ONCE_KEY_PREFIX, out); + envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX + + dsn::replica_envs::MANUAL_COMPACT_TARGET_LEVEL] = "-1"; + envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION] = + pegasus_manual_compact_service::MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP; + extract_manual_compact_opts(envs, dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX, out); ASSERT_EQ(out.target_level, -1); ASSERT_EQ(out.bottommost_level_compaction, rocksdb::BottommostLevelCompaction::kSkip); - envs[MANUAL_COMPACT_ONCE_KEY_PREFIX + MANUAL_COMPACT_TARGET_LEVEL_KEY] = "-2"; - envs[MANUAL_COMPACT_ONCE_KEY_PREFIX + MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY] = - "nonono"; - extract_manual_compact_opts(envs, MANUAL_COMPACT_ONCE_KEY_PREFIX, out); + envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX + + dsn::replica_envs::MANUAL_COMPACT_TARGET_LEVEL] = "-2"; + envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION] = "nonono"; + extract_manual_compact_opts(envs, dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX, out); ASSERT_EQ(out.target_level, -1); ASSERT_EQ(out.bottommost_level_compaction, rocksdb::BottommostLevelCompaction::kSkip); - envs[MANUAL_COMPACT_ONCE_KEY_PREFIX + MANUAL_COMPACT_TARGET_LEVEL_KEY] = "8"; - extract_manual_compact_opts(envs, MANUAL_COMPACT_ONCE_KEY_PREFIX, out); + envs[dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX + + dsn::replica_envs::MANUAL_COMPACT_TARGET_LEVEL] = "8"; + extract_manual_compact_opts(envs, dsn::replica_envs::MANUAL_COMPACT_ONCE_PREFIX, out); ASSERT_EQ(out.target_level, -1); } diff --git a/src/server/test/pegasus_server_impl_test.cpp b/src/server/test/pegasus_server_impl_test.cpp index 17671c0db1..0508e7ce56 100644 --- a/src/server/test/pegasus_server_impl_test.cpp +++ b/src/server/test/pegasus_server_impl_test.cpp @@ -28,13 +28,14 @@ #include #include +#include "common/replica_envs.h" #include "gmock/gmock.h" #include "gtest/gtest.h" -#include "pegasus_const.h" #include "pegasus_server_test_base.h" #include "rrdb/rrdb.code.definition.h" #include "rrdb/rrdb_types.h" #include "runtime/serverlet.h" +#include "server/meta_store.h" #include "server/pegasus_read_service.h" #include "utils/autoref_ptr.h" #include "utils/blob.h" @@ -72,7 +73,8 @@ class pegasus_server_impl_test : public pegasus_server_test_base // set table level slow query threshold std::map envs; _server->query_app_envs(envs); - envs[ROCKSDB_ENV_SLOW_QUERY_THRESHOLD] = std::to_string(test.slow_query_threshold_ms); + envs[dsn::replica_envs::SLOW_QUERY_THRESHOLD] = + std::to_string(test.slow_query_threshold_ms); _server->update_app_envs(envs); // do on_get/on_multi_get operation, @@ -113,10 +115,10 @@ class pegasus_server_impl_test : public pegasus_server_test_base for (const auto &test : tests) { all_test_envs[test.env_key] = test.env_value; } - for (const auto &option : pegasus::ROCKSDB_DYNAMIC_OPTIONS) { + for (const auto &option : dsn::replica_envs::ROCKSDB_DYNAMIC_OPTIONS) { ASSERT_TRUE(all_test_envs.find(option) != all_test_envs.end()); } - for (const auto &option : pegasus::ROCKSDB_STATIC_OPTIONS) { + for (const auto &option : dsn::replica_envs::ROCKSDB_STATIC_OPTIONS) { ASSERT_TRUE(all_test_envs.find(option) != all_test_envs.end()); } } @@ -158,17 +160,17 @@ TEST_P(pegasus_server_impl_test, test_open_db_with_latest_options) { // open a new db with no app env. ASSERT_EQ(dsn::ERR_OK, start()); - ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_NORMAL, _server->_usage_scenario); + ASSERT_EQ(meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL, _server->_usage_scenario); // set bulk_load scenario for the db. - ASSERT_TRUE(_server->set_usage_scenario(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD)); - ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario); + ASSERT_TRUE(_server->set_usage_scenario(meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD)); + ASSERT_EQ(meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario); rocksdb::Options opts = _server->_db->GetOptions(); ASSERT_EQ(1000000000, opts.level0_file_num_compaction_trigger); ASSERT_EQ(true, opts.disable_auto_compactions); // reopen the db. ASSERT_EQ(dsn::ERR_OK, _server->stop(false)); ASSERT_EQ(dsn::ERR_OK, start()); - ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario); + ASSERT_EQ(meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario); ASSERT_EQ(opts.level0_file_num_compaction_trigger, _server->_db->GetOptions().level0_file_num_compaction_trigger); ASSERT_EQ(opts.disable_auto_compactions, _server->_db->GetOptions().disable_auto_compactions); @@ -177,9 +179,10 @@ TEST_P(pegasus_server_impl_test, test_open_db_with_latest_options) TEST_P(pegasus_server_impl_test, test_open_db_with_app_envs) { std::map envs; - envs[ROCKSDB_ENV_USAGE_SCENARIO_KEY] = ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD; + envs[dsn::replica_envs::ROCKSDB_USAGE_SCENARIO] = + meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD; ASSERT_EQ(dsn::ERR_OK, start(envs)); - ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario); + ASSERT_EQ(meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario); } TEST_P(pegasus_server_impl_test, test_open_db_with_rocksdb_envs) @@ -219,7 +222,7 @@ TEST_P(pegasus_server_impl_test, test_update_user_specified_compaction) ASSERT_EQ("", _server->_user_specified_compaction); std::string user_specified_compaction = "test"; - envs[USER_SPECIFIED_COMPACTION] = user_specified_compaction; + envs[dsn::replica_envs::USER_SPECIFIED_COMPACTION] = user_specified_compaction; _server->update_user_specified_compaction(envs); ASSERT_EQ(user_specified_compaction, _server->_user_specified_compaction); } diff --git a/src/shell/main.cpp b/src/shell/main.cpp index 75a52abc63..186fb92bab 100644 --- a/src/shell/main.cpp +++ b/src/shell/main.cpp @@ -31,10 +31,10 @@ #include #include "args.h" -#include "base/pegasus_const.h" #include "client/replication_ddl_client.h" #include "command_executor.h" #include "commands.h" +#include "common/common.h" #include "common/replication_other_types.h" #include "pegasus/client.h" #include "runtime/app_model.h" @@ -658,15 +658,14 @@ static void freeHintsCallback(void *ptr) { sdsfree((sds)ptr); } { s_global_context.current_cluster_name = cluster_name; std::string server_list = - dsn_config_get_value_string(pegasus::PEGASUS_CLUSTER_SECTION_NAME.c_str(), + dsn_config_get_value_string(dsn::PEGASUS_CLUSTER_SECTION_NAME.c_str(), s_global_context.current_cluster_name.c_str(), "", ""); - dsn::replication::replica_helper::load_meta_servers( - s_global_context.meta_list, - pegasus::PEGASUS_CLUSTER_SECTION_NAME.c_str(), - cluster_name.c_str()); + dsn::replication::replica_helper::load_meta_servers(s_global_context.meta_list, + dsn::PEGASUS_CLUSTER_SECTION_NAME.c_str(), + cluster_name.c_str()); s_global_context.ddl_client = std::make_unique(s_global_context.meta_list); diff --git a/src/test/function_test/backup_restore/test_backup_and_restore.cpp b/src/test/function_test/backup_restore/test_backup_and_restore.cpp index 35fb0f6bd1..3eab10c7a3 100644 --- a/src/test/function_test/backup_restore/test_backup_and_restore.cpp +++ b/src/test/function_test/backup_restore/test_backup_and_restore.cpp @@ -21,7 +21,6 @@ #include #include "backup_types.h" -#include "base/pegasus_const.h" #include "client/replication_ddl_client.h" #include "gtest/gtest.h" #include "test/function_test/utils/test_util.h" diff --git a/src/test/function_test/base_api/test_batch_get.cpp b/src/test/function_test/base_api/test_batch_get.cpp index da4f9bf3ca..f196fc7bde 100644 --- a/src/test/function_test/base_api/test_batch_get.cpp +++ b/src/test/function_test/base_api/test_batch_get.cpp @@ -28,7 +28,6 @@ #include #include -#include "base/pegasus_const.h" #include "base/pegasus_key_schema.h" #include "client/partition_resolver.h" #include "gtest/gtest.h" diff --git a/src/test/function_test/base_api/test_scan.cpp b/src/test/function_test/base_api/test_scan.cpp index 2aaf43022c..bda35b25ec 100644 --- a/src/test/function_test/base_api/test_scan.cpp +++ b/src/test/function_test/base_api/test_scan.cpp @@ -29,9 +29,9 @@ #include #include -#include "base/pegasus_const.h" #include "base/pegasus_utils.h" #include "client/replication_ddl_client.h" +#include "common/replica_envs.h" #include "gtest/gtest.h" #include "include/pegasus/client.h" #include "pegasus/error.h" @@ -422,7 +422,8 @@ TEST_F(scan_test, REQUEST_EXPIRE_TS) TEST_F(scan_test, ITERATION_TIME_LIMIT) { // update iteration threshold to 1ms - NO_FATALS(update_table_env({ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, {std::to_string(1)})); + NO_FATALS(update_table_env({dsn::replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, + {std::to_string(1)})); // write data into table int32_t i = 0; @@ -442,5 +443,6 @@ TEST_F(scan_test, ITERATION_TIME_LIMIT) ASSERT_EQ(-1, count); // set iteration threshold to 100ms - NO_FATALS(update_table_env({ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, {std::to_string(100)})); + NO_FATALS(update_table_env({dsn::replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, + {std::to_string(100)})); } diff --git a/src/test/function_test/base_api/test_ttl.cpp b/src/test/function_test/base_api/test_ttl.cpp index 8c6f2b8452..aeb708b151 100644 --- a/src/test/function_test/base_api/test_ttl.cpp +++ b/src/test/function_test/base_api/test_ttl.cpp @@ -25,8 +25,8 @@ #include #include -#include "base/pegasus_const.h" #include "client/replication_ddl_client.h" +#include "common/replica_envs.h" #include "gtest/gtest.h" #include "include/pegasus/client.h" #include "pegasus/error.h" @@ -54,9 +54,10 @@ class ttl_test : public test_util std::map envs; ASSERT_EQ(ERR_OK, ddl_client_->get_app_envs(client_->get_app_name(), envs)); - std::string env = envs[TABLE_LEVEL_DEFAULT_TTL]; + std::string env = envs[dsn::replica_envs::TABLE_LEVEL_DEFAULT_TTL]; if ((env.empty() && ttl != 0) || env != std::to_string(ttl)) { - NO_FATALS(update_table_env({TABLE_LEVEL_DEFAULT_TTL}, {std::to_string(ttl)})); + NO_FATALS(update_table_env({dsn::replica_envs::TABLE_LEVEL_DEFAULT_TTL}, + {std::to_string(ttl)})); } } @@ -118,8 +119,8 @@ TEST_F(ttl_test, set_without_default_ttl) ASSERT_EQ(ttl_test_value_2, value); // trigger a manual compaction - NO_FATALS( - update_table_env({MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY}, {std::to_string(time(nullptr))})); + NO_FATALS(update_table_env({dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME}, + {std::to_string(time(nullptr))})); // check expired one ASSERT_EQ(PERR_NOT_FOUND, client_->ttl(ttl_hash_key, ttl_test_sort_key_1, ttl_seconds)); @@ -186,8 +187,8 @@ TEST_F(ttl_test, set_with_default_ttl) ASSERT_EQ(ttl_test_value_2, value); // trigger a manual compaction - NO_FATALS( - update_table_env({MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY}, {std::to_string(time(nullptr))})); + NO_FATALS(update_table_env({dsn::replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME}, + {std::to_string(time(nullptr))})); // check forever one ASSERT_EQ(PERR_OK, client_->ttl(ttl_hash_key, ttl_test_sort_key_0, ttl_seconds)); diff --git a/src/test/function_test/bulk_load/test_bulk_load.cpp b/src/test/function_test/bulk_load/test_bulk_load.cpp index 85db39c7b3..8e191264f7 100644 --- a/src/test/function_test/bulk_load/test_bulk_load.cpp +++ b/src/test/function_test/bulk_load/test_bulk_load.cpp @@ -30,7 +30,6 @@ #include #include -#include "base/pegasus_const.h" #include "base/pegasus_key_schema.h" #include "base/pegasus_utils.h" #include "base/pegasus_value_schema.h" @@ -39,6 +38,7 @@ #include "client/partition_resolver.h" #include "client/replication_ddl_client.h" #include "common/bulk_load_common.h" +#include "common/replica_envs.h" #include "gtest/gtest.h" #include "include/pegasus/client.h" // IWYU pragma: keep #include "meta/meta_bulk_load_service.h" @@ -342,7 +342,7 @@ TEST_F(bulk_load_test, missing_p0_bulk_load_metadata) // Test bulk load failed because the allow_ingest_behind config is inconsistent. TEST_F(bulk_load_test, allow_ingest_behind_inconsistent) { - NO_FATALS(update_table_env({ROCKSDB_ALLOW_INGEST_BEHIND}, {"false"})); + NO_FATALS(update_table_env({dsn::replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND}, {"false"})); ASSERT_EQ(ERR_INCONSISTENT_STATE, start_bulk_load(/* ingest_behind */ true)); } @@ -353,6 +353,6 @@ TEST_F(bulk_load_test, normal) { check_bulk_load(/* ingest_behind */ false); } // load data. TEST_F(bulk_load_test, allow_ingest_behind) { - NO_FATALS(update_table_env({ROCKSDB_ALLOW_INGEST_BEHIND}, {"true"})); + NO_FATALS(update_table_env({dsn::replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND}, {"true"})); check_bulk_load(/* ingest_behind */ true); } diff --git a/src/test/function_test/restore/test_restore.cpp b/src/test/function_test/restore/test_restore.cpp index 107051dfd2..42e13161a4 100644 --- a/src/test/function_test/restore/test_restore.cpp +++ b/src/test/function_test/restore/test_restore.cpp @@ -31,7 +31,6 @@ #include #include -#include "base/pegasus_const.h" #include "client/partition_resolver.h" #include "client/replication_ddl_client.h" #include "common/gpid.h" diff --git a/src/test/function_test/utils/test_util.cpp b/src/test/function_test/utils/test_util.cpp index 5d061582c6..e757f75eb0 100644 --- a/src/test/function_test/utils/test_util.cpp +++ b/src/test/function_test/utils/test_util.cpp @@ -30,8 +30,8 @@ #include #include -#include "base/pegasus_const.h" #include "client/replication_ddl_client.h" +#include "common/common.h" #include "common/replication_other_types.h" #include "fmt/core.h" #include "gtest/gtest.h" @@ -83,7 +83,7 @@ void test_util::SetUpTestCase() { ASSERT_TRUE(pegasus_client_factory::initialize void test_util::SetUp() { ASSERT_TRUE(replica_helper::load_meta_servers( - meta_list_, PEGASUS_CLUSTER_SECTION_NAME.c_str(), kClusterName.c_str())); + meta_list_, dsn::PEGASUS_CLUSTER_SECTION_NAME.c_str(), kClusterName.c_str())); ASSERT_FALSE(meta_list_.empty()); ddl_client_ = std::make_shared(meta_list_); diff --git a/src/test/kill_test/kill_testor.cpp b/src/test/kill_test/kill_testor.cpp index 6c1019aef1..4c9b23a279 100644 --- a/src/test/kill_test/kill_testor.cpp +++ b/src/test/kill_test/kill_testor.cpp @@ -27,8 +27,8 @@ #include #include -#include "base/pegasus_const.h" #include "client/replication_ddl_client.h" +#include "common/common.h" #include "common/gpid.h" #include "common/replication_other_types.h" #include "kill_testor.h" @@ -54,7 +54,7 @@ kill_testor::kill_testor(const char *config_file) // load meta_list dsn::replication::replica_helper::load_meta_servers( - meta_list, PEGASUS_CLUSTER_SECTION_NAME.c_str(), FLAGS_pegasus_cluster_name); + meta_list, dsn::PEGASUS_CLUSTER_SECTION_NAME.c_str(), FLAGS_pegasus_cluster_name); if (meta_list.empty()) { LOG_ERROR("Should config the meta address for killer"); exit(-1); From 546b947ecafea13cbca1139c7b1877e845e8cfcb Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Tue, 6 Feb 2024 19:25:56 +0800 Subject: [PATCH 2/3] fix(scripts): drop unnecessary dependencies for pack_tools.sh (#1901) https://github.com/apache/incubator-pegasus/issues/1900 Now that snappy/zstd/lz4 has not been the direct dependencies of Pegasus(actually all of them is the dependency of rocksdb), just remove them from pack_tools.sh. --- scripts/pack_tools.sh | 3 --- 1 file changed, 3 deletions(-) diff --git a/scripts/pack_tools.sh b/scripts/pack_tools.sh index ae7706867c..ce386b3a49 100755 --- a/scripts/pack_tools.sh +++ b/scripts/pack_tools.sh @@ -140,11 +140,8 @@ pack_tools_lib() { pack_system_lib "${pack}/lib" shell "$1" } -pack_tools_lib snappy pack_tools_lib crypto pack_tools_lib ssl -pack_tools_lib zstd -pack_tools_lib lz4 chmod -x ${pack}/lib/* From 858433dd7f6fa38f4b6a61634dbc0f820b05f960 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 7 Feb 2024 10:44:08 +0800 Subject: [PATCH 3/3] feat(new_metrics): show replica-level qps and capacity units by shell `nodes` command based on new metrics (#1899) --- src/shell/command_helper.h | 58 +++++++++++++ src/shell/commands/node_management.cpp | 104 +++++++++++------------- src/shell/commands/table_management.cpp | 2 - src/utils/metrics.cpp | 12 ++- src/utils/metrics.h | 6 +- src/utils/test/metrics_test.cpp | 1 + 6 files changed, 119 insertions(+), 64 deletions(-) diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index e5ac4395bd..43a2e05b0b 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -704,6 +704,64 @@ inline std::vector get_metrics(const std::vector &n } \ } while (0) +using stat_var_map = std::unordered_map; +inline dsn::error_s calc_metric_deltas(const std::string &json_string_1, + const std::string &json_string_2, + const std::string &entity_type, + stat_var_map &incs, + stat_var_map &rates) +{ + // Currently only Gauge and Counter are considered to have "increase" and "rate", thus brief + // `value` field is enough. + DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string_1, query_snapshot_1); + DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string_2, query_snapshot_2); + + if (query_snapshot_2.timestamp_ns <= query_snapshot_1.timestamp_ns) { + return FMT_ERR(dsn::ERR_INVALID_DATA, + "duration for metric samples should be > 0: timestamp_ns_1={}, " + "timestamp_ns_2={}", + query_snapshot_1.timestamp_ns, + query_snapshot_2.timestamp_ns); + } + + const std::vector stat_vars = {&incs, &rates}; + +#define CALC_STAT_VAR(op) \ + do { \ + if (entity.type != entity_type) { \ + continue; \ + } \ + \ + for (const auto &m : entity.metrics) { \ + for (auto &stat : stat_vars) { \ + auto iter = stat->find(m.name); \ + if (iter != stat->end()) { \ + *iter->second op m.value; \ + } \ + } \ + } \ + } while (0) + + for (const auto &entity : query_snapshot_2.entities) { + CALC_STAT_VAR(+=); + } + + for (const auto &entity : query_snapshot_1.entities) { + CALC_STAT_VAR(-=); + } + +#undef CALC_STAT_VAR + + const std::chrono::duration duration_ns( + static_cast(query_snapshot_2.timestamp_ns - query_snapshot_1.timestamp_ns)); + const std::chrono::duration duration_s = duration_ns; + for (auto &rate : rates) { + *rate.second /= duration_s.count(); + } + + return dsn::error_s::ok(); +} + inline std::vector> call_remote_command(shell_context *sc, const std::vector &nodes, diff --git a/src/shell/commands/node_management.cpp b/src/shell/commands/node_management.cpp index e9d2d9f97a..abb06ca295 100644 --- a/src/shell/commands/node_management.cpp +++ b/src/shell/commands/node_management.cpp @@ -17,36 +17,36 @@ * under the License. */ -// IWYU pragma: no_include #include #include #include #include #include +// IWYU pragma: no_include +#include #include #include #include #include #include +#include #include #include #include #include "client/replication_ddl_client.h" -#include "common/json_helper.h" #include "common/replication_enums.h" #include "dsn.layer2_types.h" #include "meta_admin_types.h" -#include "perf_counter/perf_counter_utils.h" #include "runtime/rpc/rpc_address.h" #include "shell/command_executor.h" #include "shell/command_helper.h" #include "shell/command_utils.h" #include "shell/commands.h" #include "shell/sds/sds.h" -#include "utils/blob.h" #include "utils/error_code.h" #include "utils/errors.h" +#include "utils/flags.h" #include "utils/math.h" #include "utils/metrics.h" #include "utils/output_utils.h" @@ -54,6 +54,8 @@ #include "utils/strings.h" #include "utils/utils.h" +DSN_DEFINE_uint32(shell, nodes_sample_interval_ms, 1000, "The interval between sampling metrics."); + bool query_cluster_info(command_executor *e, shell_context *sc, arguments args) { static struct option long_options[] = {{"resolve_ip", no_argument, 0, 'r'}, @@ -111,8 +113,6 @@ dsn::metric_filters resource_usage_filters() dsn::error_s parse_resource_usage(const std::string &json_string, list_nodes_helper &stat) { - dsn::error_s err; - DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string, query_snapshot); int64_t total_capacity_mb = 0; @@ -172,8 +172,6 @@ dsn::metric_filters profiler_latency_filters() dsn::error_s parse_profiler_latency(const std::string &json_string, list_nodes_helper &stat) { - dsn::error_s err; - DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(p99, json_string, query_snapshot); for (const auto &entity : query_snapshot.entities) { @@ -214,6 +212,21 @@ dsn::error_s parse_profiler_latency(const std::string &json_string, list_nodes_h return dsn::error_s::ok(); } +dsn::metric_filters rw_requests_filters() +{ + dsn::metric_filters filters; + filters.with_metric_fields = {dsn::kMetricNameField, dsn::kMetricSingleValueField}; + filters.entity_types = {"replica"}; + filters.entity_metrics = {"get_requests", + "multi_get_requests", + "batch_get_requests", + "put_requests", + "multi_put_requests", + "read_capacity_units", + "write_capacity_units"}; + return filters; +} + } // anonymous namespace bool ls_nodes(command_executor *e, shell_context *sc, arguments args) @@ -377,57 +390,32 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args) return true; } - std::vector> results = - call_remote_command(sc, - nodes, - "perf-counters-by-prefix", - {"replica*app.pegasus*get_qps", - "replica*app.pegasus*multi_get_qps", - "replica*app.pegasus*batch_get_qps", - "replica*app.pegasus*put_qps", - "replica*app.pegasus*multi_put_qps", - "replica*app.pegasus*recent.read.cu", - "replica*app.pegasus*recent.write.cu"}); - - for (int i = 0; i < nodes.size(); ++i) { - dsn::rpc_address node_addr = nodes[i].address; - auto tmp_it = tmp_map.find(node_addr); - if (tmp_it == tmp_map.end()) + const auto &results_1 = get_metrics(nodes, rw_requests_filters().to_query_string()); + std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_nodes_sample_interval_ms)); + const auto &results_2 = get_metrics(nodes, rw_requests_filters().to_query_string()); + + for (size_t i = 0; i < nodes.size(); ++i) { + auto tmp_it = tmp_map.find(nodes[i].address); + if (tmp_it == tmp_map.end()) { continue; - if (!results[i].first) { - std::cout << "query perf counter info from node " << node_addr << " failed" - << std::endl; - return true; - } - dsn::perf_counter_info info; - dsn::blob bb(results[i].second.data(), 0, results[i].second.size()); - if (!dsn::json::json_forwarder::decode(bb, info)) { - std::cout << "decode perf counter info from node " << node_addr - << " failed, result = " << results[i].second << std::endl; - return true; - } - if (info.result != "OK") { - std::cout << "query perf counter info from node " << node_addr - << " returns error, error = " << info.result << std::endl; - return true; - } - list_nodes_helper &h = tmp_it->second; - for (dsn::perf_counter_metric &m : info.counters) { - if (m.name.find("replica*app.pegasus*get_qps") != std::string::npos) - h.get_qps += m.value; - else if (m.name.find("replica*app.pegasus*multi_get_qps") != std::string::npos) - h.multi_get_qps += m.value; - else if (m.name.find("replica*app.pegasus*batch_get_qps") != std::string::npos) - h.batch_get_qps += m.value; - else if (m.name.find("replica*app.pegasus*put_qps") != std::string::npos) - h.put_qps += m.value; - else if (m.name.find("replica*app.pegasus*multi_put_qps") != std::string::npos) - h.multi_put_qps += m.value; - else if (m.name.find("replica*app.pegasus*recent.read.cu") != std::string::npos) - h.read_cu += m.value; - else if (m.name.find("replica*app.pegasus*recent.write.cu") != std::string::npos) - h.write_cu += m.value; } + + RETURN_SHELL_IF_GET_METRICS_FAILED(results_1[i], nodes[i], "1st rw requests"); + RETURN_SHELL_IF_GET_METRICS_FAILED(results_2[i], nodes[i], "2nd rw requests"); + + list_nodes_helper &stat = tmp_it->second; + stat_var_map incs = {{"read_capacity_units", &stat.read_cu}, + {"write_capacity_units", &stat.write_cu}}; + stat_var_map rates = {{"get_requests", &stat.get_qps}, + {"multi_get_requests", &stat.multi_get_qps}, + {"batch_get_requests", &stat.batch_get_qps}, + {"put_requests", &stat.put_qps}, + {"multi_put_requests", &stat.multi_put_qps}}; + RETURN_SHELL_IF_PARSE_METRICS_FAILED( + calc_metric_deltas( + results_1[i].body(), results_2[i].body(), "replica", incs, rates), + nodes[i], + "rw requests"); } } @@ -440,7 +428,7 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args) const auto &results = get_metrics(nodes, profiler_latency_filters().to_query_string()); - for (int i = 0; i < nodes.size(); ++i) { + for (size_t i = 0; i < nodes.size(); ++i) { auto tmp_it = tmp_map.find(nodes[i].address); if (tmp_it == tmp_map.end()) { continue; diff --git a/src/shell/commands/table_management.cpp b/src/shell/commands/table_management.cpp index e6167da3f1..b5a5e34a15 100644 --- a/src/shell/commands/table_management.cpp +++ b/src/shell/commands/table_management.cpp @@ -187,8 +187,6 @@ dsn::error_s parse_sst_stat(const std::string &json_string, std::map &count_map, std::map &disk_map) { - dsn::error_s err; - DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string, query_snapshot); for (const auto &entity : query_snapshot.entities) { diff --git a/src/utils/metrics.cpp b/src/utils/metrics.cpp index 681b6555ca..09d08c4c9a 100644 --- a/src/utils/metrics.cpp +++ b/src/utils/metrics.cpp @@ -520,6 +520,13 @@ void encode_port(dsn::metric_json_writer &writer) ENCODE_OBJ_VAL(rpc != nullptr, rpc->primary_address().port()); } +void encode_timestamp_ns(dsn::metric_json_writer &writer) +{ + writer.Key(dsn::kMetricTimestampNsField.c_str()); + + ENCODE_OBJ_VAL(true, dsn_now_ns()); +} + #undef ENCODE_OBJ_VAL } // anonymous namespace @@ -549,6 +556,7 @@ void metric_registry::take_snapshot(metric_json_writer &writer, const metric_fil encode_role(writer); encode_host(writer); encode_port(writer); + encode_timestamp_ns(writer); encode_entities(writer, filters); writer.EndObject(); } @@ -557,7 +565,7 @@ metric_registry::collected_entities_info metric_registry::collect_stale_entities { collected_entities_info collected_info; - auto now = dsn_now_ms(); + const auto now = dsn_now_ms(); utils::auto_read_lock l(_lock); @@ -596,7 +604,7 @@ metric_registry::retire_stale_entities(const collected_entity_list &collected_en retired_entities_stat retired_stat; - auto now = dsn_now_ms(); + const auto now = dsn_now_ms(); utils::auto_write_lock l(_lock); diff --git a/src/utils/metrics.h b/src/utils/metrics.h index fb88aaeb0d..449bbf1a25 100644 --- a/src/utils/metrics.h +++ b/src/utils/metrics.h @@ -353,6 +353,7 @@ const std::string kMetricClusterField = "cluster"; const std::string kMetricRoleField = "role"; const std::string kMetricHostField = "host"; const std::string kMetricPortField = "port"; +const std::string kMetricTimestampNsField = "timestamp_ns"; const std::string kMetricEntitiesField = "entities"; class metric_entity : public ref_counter @@ -1679,9 +1680,10 @@ class auto_count std::string role; \ std::string host; \ uint16_t port; \ + uint64_t timestamp_ns; \ std::vector entities; \ \ - DEFINE_JSON_SERIALIZATION(cluster, role, host, port, entities) \ + DEFINE_JSON_SERIALIZATION(cluster, role, host, port, timestamp_ns, entities) \ } #define DEF_ALL_METRIC_BRIEF_SNAPSHOTS(field) \ @@ -1700,7 +1702,7 @@ DEF_ALL_METRIC_BRIEF_SNAPSHOTS(p99); if (dsn_unlikely( \ !dsn::json::json_forwarder::decode( \ bb, query_snapshot))) { \ - return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string"); \ + return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string: {}", json_string); \ } \ } while (0) diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp index cbacc6a7b1..640f8267e0 100644 --- a/src/utils/test/metrics_test.cpp +++ b/src/utils/test/metrics_test.cpp @@ -2271,6 +2271,7 @@ const std::unordered_set kAllMetricQueryFields = {kMetricClusterFie kMetricRoleField, kMetricHostField, kMetricPortField, + kMetricTimestampNsField, kMetricEntitiesField}; void check_entity_ids_from_json_string(const std::string &json_string,