Skip to content

Commit

Permalink
shell: use rocksdb's histogram for shell key-value size statistics (#216
Browse files Browse the repository at this point in the history
)

* shell command `count_data -z` could show more information, such as key/value/row size median, p99, p95, etc.
  • Loading branch information
acelyc111 committed Dec 14, 2018
1 parent dd7868d commit 342cf71
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 131 deletions.
6 changes: 4 additions & 2 deletions src/geo/bench/bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
#include "geo/lib/geo_client.h"

#include <iostream>

#include <s2/s2testing.h>
#include <s2/s2cell.h>
#include "monitoring/histogram.h"
#include <monitoring/histogram.h>
#include <rocksdb/env.h>

#include <dsn/utility/strings.h>
#include <dsn/utility/string_conv.h>
#include <rocksdb/env.h>

static const int data_count = 10000;

Expand Down
28 changes: 14 additions & 14 deletions src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ set(MY_PROJ_SRC "")
# "GLOB" for non-recursive search
set(MY_SRC_SEARCH_MODE "GLOB")

set(MY_PROJ_INC_PATH "")
set(MY_PROJ_INC_PATH
"${PEGASUS_PROJECT_DIR}/rocksdb")

set(MY_PROJ_LIB_PATH
"${PEGASUS_PROJECT_DIR}/rocksdb/build")

set(MY_PROJ_LIBS
dsn_replica_server
Expand All @@ -32,26 +36,22 @@ set(MY_PROJ_LIBS
PocoJSON
crypto
fmt
)

if (UNIX)
set(MY_PROJ_LIBS rocksdb ${MY_PROJ_LIBS} z bz2 snappy rt aio pthread)
else()
set(MY_PROJ_LIBS rocksdblib ${MY_PROJ_LIBS} rpcrt4)
endif()

set(MY_PROJ_LIB_PATH "../../rocksdb/build")
rocksdb
z
bz2
snappy
rt
aio
pthread)

set(MY_BOOST_PACKAGES system filesystem)

set(MY_BINPLACES "${CMAKE_CURRENT_SOURCE_DIR}/config.ini")

add_definitions(-Wno-attributes)

if (UNIX)
SET(CMAKE_INSTALL_RPATH ".")
SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
endif()
SET(CMAKE_INSTALL_RPATH ".")
SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)

dsn_add_executable()
dsn_install_executable()
30 changes: 16 additions & 14 deletions src/shell/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ set(MY_PROJ_SRC "linenoise/linenoise.c" "sds/sds.c")
# "GLOB" for non-recursive search
set(MY_SRC_SEARCH_MODE "GLOB")

set(MY_PROJ_INC_PATH "../include" "../base")
set(MY_PROJ_INC_PATH
"${PEGASUS_PROJECT_DIR}/src/include"
"${PEGASUS_PROJECT_DIR}/src/base"
"${PEGASUS_PROJECT_DIR}/rocksdb")

set(MY_PROJ_LIB_PATH
"${PEGASUS_PROJECT_DIR}/rocksdb/build")

set(MY_PROJ_LIBS
dsn.replication.tool
Expand All @@ -31,24 +37,20 @@ set(MY_PROJ_LIBS
fmt
pegasus_geo_lib
s2
)

if (UNIX)
set(MY_PROJ_LIBS rocksdb ${MY_PROJ_LIBS} z bz2 snappy rt aio pthread)
else()
set(MY_PROJ_LIBS rocksdblib ${MY_PROJ_LIBS} rpcrt4)
endif()

set(MY_PROJ_LIB_PATH "../../rocksdb/build")
rocksdb
z
bz2
snappy
rt
aio
pthread)

set(MY_BINPLACES "${CMAKE_CURRENT_SOURCE_DIR}/config.ini")

set(MY_BOOST_PACKAGES system filesystem)

if (UNIX)
SET(CMAKE_INSTALL_RPATH ".")
SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
endif()
SET(CMAKE_INSTALL_RPATH ".")
SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)

add_definitions(-Wno-attributes)

Expand Down
35 changes: 14 additions & 21 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <boost/algorithm/string.hpp>
#include <rocksdb/db.h>
#include <rocksdb/sst_dump_tool.h>
#include <rocksdb/env.h>
#include <monitoring/histogram.h>
#include <dsn/dist/cli/cli.client.h>
#include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/dist/replication/mutation_log_tool.h>
Expand Down Expand Up @@ -107,13 +109,10 @@ struct scan_data_context
std::atomic_long split_request_count;
std::atomic_bool split_completed;
bool stat_size;
std::atomic_long hash_key_size_sum;
std::atomic_long hash_key_size_max;
std::atomic_long sort_key_size_sum;
std::atomic_long sort_key_size_max;
std::atomic_long value_size_sum;
std::atomic_long value_size_max;
std::atomic_long row_size_max;
rocksdb::HistogramImpl hash_key_size_histogram;
rocksdb::HistogramImpl sort_key_size_histogram;
rocksdb::HistogramImpl value_size_histogram;
rocksdb::HistogramImpl row_size_histogram;
int top_count;
top_container top_rows;
scan_data_context(scan_data_operator op_,
Expand All @@ -138,13 +137,6 @@ struct scan_data_context
split_request_count(0),
split_completed(false),
stat_size(stat_size_),
hash_key_size_sum(0),
hash_key_size_max(0),
sort_key_size_sum(0),
sort_key_size_max(0),
value_size_sum(0),
value_size_max(0),
row_size_max(0),
top_count(top_count_),
top_rows(top_count_)
{
Expand Down Expand Up @@ -224,16 +216,17 @@ inline void scan_data_next(scan_data_context *context)
context->split_rows++;
if (context->stat_size) {
long hash_key_size = hash_key.size();
context->hash_key_size_sum += hash_key_size;
update_atomic_max(context->hash_key_size_max, hash_key_size);
context->hash_key_size_histogram.Add(hash_key_size);

long sort_key_size = sort_key.size();
context->sort_key_size_sum += sort_key_size;
update_atomic_max(context->sort_key_size_max, sort_key_size);
context->sort_key_size_histogram.Add(sort_key_size);

long value_size = value.size();
context->value_size_sum += value_size;
update_atomic_max(context->value_size_max, value_size);
context->value_size_histogram.Add(value_size);

long row_size = hash_key_size + sort_key_size + value_size;
update_atomic_max(context->row_size_max, row_size);
context->row_size_histogram.Add(row_size);

if (context->top_count > 0) {
context->top_rows.push(
std::move(hash_key), std::move(sort_key), row_size);
Expand Down
123 changes: 43 additions & 80 deletions src/shell/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -2530,6 +2530,46 @@ inline bool clear_data(command_executor *e, shell_context *sc, arguments args)
return true;
}

static void print_simple_histogram(const std::string &name, const rocksdb::HistogramImpl &histogram)
{
fprintf(stderr, "[%s]\n", name.c_str());
fprintf(stderr, " max = %ld\n", histogram.max());
fprintf(stderr, " med = %.2f\n", histogram.Median());
fprintf(stderr, " avg = %.2f\n", histogram.Average());
fprintf(stderr, " min = %ld\n", histogram.min());
fprintf(stderr, " P99 = %.2f\n", histogram.Percentile(99.0));
fprintf(stderr, " P95 = %.2f\n", histogram.Percentile(95.0));
fprintf(stderr, " P90 = %.2f\n", histogram.Percentile(90.0));
}

static void print_current_scan_state(const std::vector<scan_data_context *> &contexts,
const std::string &stop_desc,
bool stat_size)
{
rocksdb::HistogramImpl hash_key_size_histogram;
rocksdb::HistogramImpl sort_key_size_histogram;
rocksdb::HistogramImpl value_size_histogram;
rocksdb::HistogramImpl row_size_histogram;
for (const auto &context : contexts) {
hash_key_size_histogram.Merge(context->hash_key_size_histogram);
sort_key_size_histogram.Merge(context->sort_key_size_histogram);
value_size_histogram.Merge(context->value_size_histogram);
row_size_histogram.Merge(context->row_size_histogram);
fprintf(stderr,
"INFO: split[%d]: %ld rows\n",
context->split_id,
context->row_size_histogram.num());
}
fprintf(stderr, "Count %s, total %ld rows.\n\n", stop_desc.c_str(), row_size_histogram.num());

if (stat_size) {
print_simple_histogram("hash_key_size", hash_key_size_histogram);
print_simple_histogram("sort_key_size", sort_key_size_histogram);
print_simple_histogram("value_size", value_size_histogram);
print_simple_histogram("row_size", row_size_histogram);
}
}

inline bool count_data(command_executor *e, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"max_split_count", required_argument, 0, 's'},
Expand Down Expand Up @@ -2702,46 +2742,7 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
break;
last_total_rows = cur_total_rows;
if (stat_size && sleep_seconds % 10 == 0) {
long total_rows = 0;
long hash_key_size_sum = 0;
long hash_key_size_max = 0;
long sort_key_size_sum = 0;
long sort_key_size_max = 0;
long value_size_sum = 0;
long value_size_max = 0;
long row_size_max = 0;
for (int i = 0; i < scanners.size(); i++) {
total_rows += contexts[i]->split_rows.load();
hash_key_size_sum += contexts[i]->hash_key_size_sum.load();
hash_key_size_max =
std::max(contexts[i]->hash_key_size_max.load(), hash_key_size_max);
sort_key_size_sum += contexts[i]->sort_key_size_sum.load();
sort_key_size_max =
std::max(contexts[i]->sort_key_size_max.load(), sort_key_size_max);
value_size_sum += contexts[i]->value_size_sum.load();
value_size_max = std::max(contexts[i]->value_size_max.load(), value_size_max);
row_size_max = std::max(contexts[i]->row_size_max.load(), row_size_max);
}
long row_size_sum = hash_key_size_sum + sort_key_size_sum + value_size_sum;
double hash_key_size_avg =
total_rows == 0 ? 0.0 : (double)hash_key_size_sum / total_rows;
double sort_key_size_avg =
total_rows == 0 ? 0.0 : (double)sort_key_size_sum / total_rows;
double value_size_avg = total_rows == 0 ? 0.0 : (double)value_size_sum / total_rows;
double row_size_avg = total_rows == 0 ? 0.0 : (double)row_size_sum / total_rows;
fprintf(stderr, "[row].count = %ld\n", total_rows);
fprintf(stderr, "[hash_key].size_sum = %ld\n", hash_key_size_sum);
fprintf(stderr, "[hash_key].size_max = %ld\n", hash_key_size_max);
fprintf(stderr, "[hash_key].size_avg = %.2f\n", hash_key_size_avg);
fprintf(stderr, "[sort_key].size_sum = %ld\n", sort_key_size_sum);
fprintf(stderr, "[sort_key].size_max = %ld\n", sort_key_size_max);
fprintf(stderr, "[sort_key].size_avg = %.2f\n", sort_key_size_avg);
fprintf(stderr, "[value].size_sum = %ld\n", value_size_sum);
fprintf(stderr, "[value].size_max = %ld\n", value_size_max);
fprintf(stderr, "[value].size_avg = %.2f\n", value_size_avg);
fprintf(stderr, "[row].size_sum = %ld\n", row_size_sum);
fprintf(stderr, "[row].size_max = %ld\n", row_size_max);
fprintf(stderr, "[row].size_avg = %.2f\n", row_size_avg);
print_current_scan_state(contexts, "partially", stat_size);
}
}

Expand All @@ -2753,28 +2754,6 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
}
}

long total_rows = 0;
long hash_key_size_sum = 0;
long hash_key_size_max = 0;
long sort_key_size_sum = 0;
long sort_key_size_max = 0;
long value_size_sum = 0;
long value_size_max = 0;
long row_size_max = 0;
for (int i = 0; i < scanners.size(); i++) {
fprintf(stderr, "INFO: split[%d]: %ld rows\n", i, contexts[i]->split_rows.load());
total_rows += contexts[i]->split_rows.load();
if (stat_size) {
hash_key_size_sum += contexts[i]->hash_key_size_sum.load();
hash_key_size_max = std::max(contexts[i]->hash_key_size_max.load(), hash_key_size_max);
sort_key_size_sum += contexts[i]->sort_key_size_sum.load();
sort_key_size_max = std::max(contexts[i]->sort_key_size_max.load(), sort_key_size_max);
value_size_sum += contexts[i]->value_size_sum.load();
value_size_max = std::max(contexts[i]->value_size_max.load(), value_size_max);
row_size_max = std::max(contexts[i]->row_size_max.load(), row_size_max);
}
}

std::string stop_desc;
if (error_occurred.load()) {
if (stopped_by_wait_seconds) {
Expand All @@ -2785,26 +2764,10 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
} else {
stop_desc = "done";
}
fprintf(stderr, "\nCount %s, total %ld rows.\n", stop_desc.c_str(), total_rows);

print_current_scan_state(contexts, stop_desc, stat_size);

if (stat_size) {
long row_size_sum = hash_key_size_sum + sort_key_size_sum + value_size_sum;
double hash_key_size_avg = total_rows == 0 ? 0.0 : (double)hash_key_size_sum / total_rows;
double sort_key_size_avg = total_rows == 0 ? 0.0 : (double)sort_key_size_sum / total_rows;
double value_size_avg = total_rows == 0 ? 0.0 : (double)value_size_sum / total_rows;
double row_size_avg = total_rows == 0 ? 0.0 : (double)row_size_sum / total_rows;
fprintf(stderr, "[hash_key].size_sum = %ld\n", hash_key_size_sum);
fprintf(stderr, "[hash_key].size_max = %ld\n", hash_key_size_max);
fprintf(stderr, "[hash_key].size_avg = %.2f\n", hash_key_size_avg);
fprintf(stderr, "[sort_key].size_sum = %ld\n", sort_key_size_sum);
fprintf(stderr, "[sort_key].size_max = %ld\n", sort_key_size_max);
fprintf(stderr, "[sort_key].size_avg = %.2f\n", sort_key_size_avg);
fprintf(stderr, "[value].size_sum = %ld\n", value_size_sum);
fprintf(stderr, "[value].size_max = %ld\n", value_size_max);
fprintf(stderr, "[value].size_avg = %.2f\n", value_size_avg);
fprintf(stderr, "[row].size_sum = %ld\n", row_size_sum);
fprintf(stderr, "[row].size_max = %ld\n", row_size_max);
fprintf(stderr, "[row].size_avg = %.2f\n", row_size_avg);
if (top_count > 0) {
top_container::top_heap heap;
for (int i = 0; i < scanners.size(); i++) {
Expand Down

0 comments on commit 342cf71

Please sign in to comment.