Skip to content

Commit

Permalink
Merge d7b36e1 into f754a3e
Browse files Browse the repository at this point in the history
  • Loading branch information
maximyurchuk authored Jan 10, 2025
2 parents f754a3e + d7b36e1 commit 2dd1e56
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 48 deletions.
49 changes: 41 additions & 8 deletions ydb/library/workload/kv/kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,14 @@ std::string TKvWorkloadGenerator::GetDDLQueries() const {

for (size_t i = 0; i < Params.ColumnsCnt; ++i) {
if (i < Params.IntColumnsCnt) {
ss << "c" << i << " Uint64, ";
ss << "c" << i << " Uint64";
} else {
ss << "c" << i << " String, ";
ss << "c" << i << " String";
}
if (i < Params.KeyColumnsCnt && Params.GetStoreType() == TKvWorkloadParams::EStoreType::Column) {
ss << " NOT NULL";
}
ss << ", ";
}

ss << "PRIMARY KEY(";
Expand All @@ -166,13 +170,23 @@ std::string TKvWorkloadGenerator::GetDDLQueries() const {
}
ss << ")) WITH (";

if (Params.PartitionsByLoad) {
ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, ";
switch (Params.GetStoreType()) {
case TKvWorkloadParams::EStoreType::Row:
ss << "STORE = ROW, ";
if (Params.PartitionsByLoad) {
ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, ";
}
ss << "UNIFORM_PARTITIONS = " << Params.MinPartitions << ", ";
ss << "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = " << Max(Params.MinPartitions, Params.MaxPartitions) << ", ";
ss << "AUTO_PARTITIONING_PARTITION_SIZE_MB = " << Params.PartitionSizeMb << ", ";
break;
case TKvWorkloadParams::EStoreType::Column:
ss << "STORE = COLUMN, ";
break;
default:
throw yexception() << "Unsupported store type: " << Params.GetStoreType();
}
ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << Params.MinPartitions << ", ";
ss << "AUTO_PARTITIONING_PARTITION_SIZE_MB = " << Params.PartitionSizeMb << ", ";
ss << "UNIFORM_PARTITIONS = " << Params.MinPartitions << ", ";
ss << "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = " << Max(Params.MinPartitions, Params.MaxPartitions) << ")";
ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << Params.MinPartitions << ")";

return ss.str();
}
Expand Down Expand Up @@ -493,6 +507,14 @@ TVector<TRow> TKvWorkloadGenerator::GenerateRandomRows(bool randomValues) {
}

void TKvWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) {
opts.AddLongOption('p', "path", "Path where benchmark tables are located")
.Optional()
.DefaultValue(TableName)
.Handler1T<TStringBuf>([this](TStringBuf arg) {
while(arg.SkipPrefix("/"));
while(arg.ChopSuffix("/"));
TableName = arg;
});
switch (commandType) {
case TWorkloadParams::ECommandType::Init:
opts.AddLongOption("init-upserts", "count of upserts need to create while table initialization")
Expand All @@ -517,6 +539,17 @@ void TKvWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandTy
.DefaultValue((ui64)KvWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt);
opts.AddLongOption("rows", "Number of rows")
.DefaultValue((ui64)KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt);
opts.AddLongOption("store", "Storage type."
" Options: row, column\n"
" row - use row-based storage engine;\n"
" column - use column-based storage engine.")
.DefaultValue(StoreType)
.Handler1T<TStringBuf>([this](TStringBuf arg) {
const auto l = to_lower(TString(arg));
if (!TryFromString(arg, StoreType)) {
throw yexception() << "Ivalid store type: " << arg;
}
});
break;
case TWorkloadParams::ECommandType::Run:
opts.AddLongOption("max-first-key", "Maximum value of a first primary key")
Expand Down
8 changes: 7 additions & 1 deletion ydb/library/workload/kv/kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ enum KvWorkloadConstants : ui64 {

class TKvWorkloadParams : public TWorkloadParams {
public:
enum class EStoreType {
Row /* "row" */,
Column /* "column" */,
};

void ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) override;
THolder<IWorkloadQueryGenerator> CreateGenerator() const override;
TString GetWorkloadName() const override;
Expand All @@ -49,9 +54,10 @@ class TKvWorkloadParams : public TWorkloadParams {
ui64 MixedDoReadRows = KvWorkloadConstants::MIXED_DO_READ_ROWS;
ui64 MixedDoSelect = KvWorkloadConstants::MIXED_DO_SELECT;

const std::string TableName = "kv_test";
std::string TableName = "kv_test";

bool StaleRO = KvWorkloadConstants::STALE_RO;
YDB_READONLY(EStoreType, StoreType, EStoreType::Row);
};

class TKvWorkloadGenerator final: public TWorkloadQueryGeneratorBase<TKvWorkloadParams> {
Expand Down
82 changes: 43 additions & 39 deletions ydb/tests/workloads/kv/tests/test_workload.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
import os

import pytest

import yatest

from ydb.tests.library.harness.kikimr_runner import KiKiMR
Expand All @@ -13,47 +15,49 @@ class TestYdbKvWorkload(object):
def setup_class(cls):
cls.cluster = KiKiMR(KikimrConfigGenerator(erasure=Erasure.MIRROR_3_DC))
cls.cluster.start()
cls.init_command_prefix = [
yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")),
"--verbose",
"--endpoint", "grpc://localhost:%d" % cls.cluster.nodes[1].grpc_port,
"--database=/Root",
"workload", "kv", "init",
"--min-partitions", "1",
"--partition-size", "10",
"--auto-partition", "0",
"--init-upserts", "0",
"--cols", "5",
"--int-cols", "2",
"--key-cols", "3",
]

cls.run_command_prefix = [
yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")),
"--verbose",
"--endpoint", "grpc://localhost:%d" % cls.cluster.nodes[1].grpc_port,
"--database=/Root",
"workload", "kv", "run", "mixed",
"--seconds", "100",
"--threads", "10",
"--cols", "5",
"--len", "200",
"--int-cols", "2",
"--key-cols", "3"
]

@classmethod
def teardown_class(cls):
cls.cluster.stop()

def test(self):
yatest.common.execute(
[
yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")),
"--verbose",
"--endpoint", "grpc://localhost:%d" % self.cluster.nodes[1].grpc_port,
"--database=/Root",

"workload", "kv", "init",

"--min-partitions", "1",
"--partition-size", "10",
"--auto-partition", "0",
"--init-upserts", "0",
"--cols", "5",
"--int-cols", "2",
"--key-cols", "3"
],
wait=True
)

yatest.common.execute(
[
yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")),
"--verbose",
"--endpoint", "grpc://localhost:%d" % self.cluster.nodes[1].grpc_port,
"--database=/Root",

"workload", "kv", "run", "mixed",
"--seconds", "100",
"--threads", "10",

"--cols", "5",
"--len", "200",
"--int-cols", "2",
"--key-cols", "3"
],
wait=True
)
@pytest.mark.parametrize("store_type", ["row", "column"])
def test(self, store_type):
init_command = self.init_command_prefix
init_command.extend([
"--path", store_type,
"--store", store_type,
])
run_command = self.run_command_prefix
run_command.extend([
"--path", store_type,
])
yatest.common.execute(init_command, wait=True)
yatest.common.execute(run_command, wait=True)

0 comments on commit 2dd1e56

Please sign in to comment.