Skip to content

Commit

Permalink
Add the related test.
Browse files Browse the repository at this point in the history
Signed-off-by: Ye Cao <[email protected]>
  • Loading branch information
dashanji committed Jul 3, 2024
1 parent 391018f commit 8a13b9b
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 2 deletions.
37 changes: 37 additions & 0 deletions python/vineyard/deploy/tests/test_vineyardd_failure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020-2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import numpy as np

import vineyard


def test_put_meta_before_failure(vineyard_endpoint):
client = vineyard.connect(endpoint=vineyard_endpoint)

for i in range(10):
data = np.ones((1, 2, 3, 4, 5))
client.put(data, name="data-%d" % i, persist=True)


def test_get_meta_after_failure(vineyard_endpoint):
client = vineyard.connect(endpoint=vineyard_endpoint)

for i in range(10):
obj = client.get_name("data-%d" % i)
client.get_meta(obj)
86 changes: 86 additions & 0 deletions test/etcd_member_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/** Copyright 2020-2023 Alibaba Group Holding Limited.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include <map>
#include <string>

#include "boost/process.hpp" // IWYU pragma: keep

#include "client/client.h"
#include "common/util/logging.h"

using namespace vineyard; // NOLINT(build/namespaces)

int getStartedMembersSize(const std::string& etcdctl_cmd,
const std::string& etcd_endpoints) {
int members_size = 0;
boost::process::ipstream output_stream;
std::error_code ec;

std::unique_ptr<boost::process::child> etcdctl_proc_ =
std::make_unique<boost::process::child>(
etcdctl_cmd, "member", "list", "--endpoints=" + etcd_endpoints,
"--write-out=json", boost::process::std_out > output_stream,
boost::process::std_err > stderr, ec);

if (!etcdctl_proc_ || ec) {
LOG(ERROR) << "Failed to run etcdctl member list: " << ec.message();
return members_size;
}

std::stringstream buffer;
std::string line;
while (std::getline(output_stream, line)) {
buffer << line << '\n';
}

std::string output = buffer.str();
auto result = json::parse(output);
for (const auto& member : result["members"]) {
if (member.find("clientURLs") == member.end()) {
continue;
}
members_size++;
}
return members_size;
}

int main(int argc, char** argv) {
if (argc < 2) {
printf(
"usage ./etcd_member_test <ipc_socket> <etcdctl_path> "
"<etcd_endpoints>");
return 1;
}
std::string ipc_socket = std::string(argv[1]);
std::string etcdctl_cmd = std::string(argv[2]);
std::string etcd_endpoints = std::string(argv[3]);

Client client;
VINEYARD_CHECK_OK(client.Connect(ipc_socket));
LOG(INFO) << "Connected to IPCServer: " << ipc_socket;

std::map<InstanceID, json> cluster;
VINEYARD_CHECK_OK(client.ClusterInfo(cluster));
CHECK(!cluster.empty());

int members_size = getStartedMembersSize(etcdctl_cmd, etcd_endpoints);
CHECK_EQ(members_size, cluster.size());
LOG(INFO) << "Passed etcd member tests...";

client.Disconnect();

return 0;
}
88 changes: 86 additions & 2 deletions test/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,83 @@ def run_vineyard_spill_tests(meta, allocator, endpoints, tests):
run_test(tests, 'spill_test')


def run_etcd_member_tests(meta, allocator, endpoints, tests):
"""
Here we start 2 vineyard instances,
and test if the etcd member is also started.
"""
with start_multiple_vineyardd(
make_metadata_settings(meta, endpoints, 'vineyard_test_%s' % time.time()),
['--allocator', allocator],
instance_size=2,
default_ipc_socket=VINEYARD_CI_IPC_SOCKET,
):
vineyard_ipc_socket = '%s.%d' % (VINEYARD_CI_IPC_SOCKET, 0)
# wait for each vineyard instance status has been updated to etcd
time.sleep(20)
etcdctl_cmd = find_executable('etcdctl')
run_test(
tests,
'etcd_member_test',
etcdctl_cmd,
endpoints,
vineyard_ipc_socket=vineyard_ipc_socket,
)


def run_vineyardd_failure_tests(meta, allocator, endpoints, test_args):
"""Here we start 3 vineyard instances, and test the single node failure scenario."""

def run_test(test_function, rpc_socket_port):
args = [
'pytest',
'-s',
'-vvv',
'--exitfirst',
'--durations=0',
'--log-cli-level',
'DEBUG',
'python/vineyard/deploy/tests/test_vineyardd_failure.py',
'-k',
test_function,
'--vineyard-endpoint=127.0.0.1:%s' % rpc_socket_port,
]

subprocess.check_call(
args, cwd=os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')
)

with start_multiple_vineyardd(
make_metadata_settings(meta, endpoints, 'vineyard_test_%s' % time.time()),
['--allocator', allocator],
instance_size=3,
default_ipc_socket=VINEYARD_CI_IPC_SOCKET,
) as instances:
start_time = time.time()
rpc_socket_port = instances[0][1]
run_test('test_put_meta_before_failure', rpc_socket_port)

instances[0][0].terminate()
rpc_socket_port = instances[1][1]
# check the etcd can serve the read request
run_test('test_get_meta_after_failure', rpc_socket_port)
# check the etcd can serve the write request
run_test('test_put_meta_after_failure', rpc_socket_port)

instances[1][0].terminate()
rpc_socket_port = instances[2][1]
try:
run_test('test_get_meta_after_failure', rpc_socket_port)
except subprocess.CalledProcessError as e:
print(f"Expected error, as the etcd is scaled down to 1: {e}")

print(
'running vineyardd failure tests use %s seconds'
% (time.time() - start_time),
flush=True,
)


def run_graph_extend_test(tests):
data_dir = os.getenv('VINEYARD_DATA_DIR')
vdata = pd.read_csv(data_dir + '/p2p_v.csv')
Expand Down Expand Up @@ -823,7 +900,9 @@ def run_python_deploy_tests(meta, allocator, endpoints, test_args, with_migratio
'--durations=0',
'--log-cli-level',
'DEBUG',
'python/vineyard/deploy/tests',
'python/vineyard/deploy/tests/test_distributed.py',
'python/vineyard/deploy/tests/test_local.py',
'python/vineyard/deploy/tests/test_migration.py',
'python/vineyard/drivers/io/tests/test_migrate_stream.py',
*test_args,
'--vineyard-endpoint=localhost:%s' % rpc_socket_port,
Expand Down Expand Up @@ -1008,7 +1087,6 @@ def parse_sys_args():
default=False,
help="Whether to run python contrib pyspark tests",
)

arg_parser.add_argument(
'--with-fuse',
action='store_true',
Expand Down Expand Up @@ -1072,6 +1150,12 @@ def execute_tests(args):
args.with_migration,
)

run_etcd_member_tests(args.meta, args.allocator, endpoints, args.tests)

run_vineyardd_failure_tests(
args.meta, args.allocator, endpoints, python_test_args
)

if args.with_io:
run_io_adaptor_tests(args.meta, args.allocator, endpoints, python_test_args)

Expand Down

0 comments on commit 8a13b9b

Please sign in to comment.