Skip to content

Commit

Permalink
test(bdd): speedup tests by reusing clusters
Browse files Browse the repository at this point in the history
Signed-off-by: Tiago Castro <[email protected]>
  • Loading branch information
tiagolobocastro committed Jul 3, 2023
1 parent f6e8fbf commit 85a1faf
Show file tree
Hide file tree
Showing 21 changed files with 364 additions and 106 deletions.
2 changes: 1 addition & 1 deletion control-plane/agents/src/bin/core/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ fn value_parse_percent(value: &str) -> Result<u64, ParseIntError> {
async fn main() {
let cli_args = CliArgs::args();
utils::print_package_info!();
println!("Using options: {:?}", &cli_args);
println!("Using options: {cli_args:?}");
utils::tracing_telemetry::init_tracing(
"core-agent",
cli_args.tracing_tags.clone(),
Expand Down
9 changes: 6 additions & 3 deletions control-plane/agents/src/bin/ha/cluster/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ struct Cli {
/// Add process service tags to the traces.
#[clap(short, long, env = "TRACING_TAGS", value_delimiter=',', value_parser = utils::tracing_telemetry::parse_key_value)]
tracing_tags: Vec<KeyValue>,

/// If set, configures the fast requeue period to this duration.
#[clap(long)]
fast_requeue: Option<humantime::Duration>,
}

impl Cli {
Expand Down Expand Up @@ -69,9 +73,8 @@ fn initialize_tracing(args: &Cli) {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
utils::print_package_info!();

let cli = Cli::args();

println!("Using options: {cli:?}");
initialize_tracing(&cli);

// Initialise the core client to be used in rest
Expand All @@ -86,7 +89,7 @@ async fn main() -> anyhow::Result<()> {
let entries = store.fetch_incomplete_requests().await?;

// Node list has ref counted list internally.
let mover = volume::VolumeMover::new(store, node_list.clone());
let mover = volume::VolumeMover::new(store, cli.fast_requeue, node_list.clone());
mover.send_switchover_req(entries).await?;

info!("starting cluster-agent server");
Expand Down
14 changes: 12 additions & 2 deletions control-plane/agents/src/bin/ha/cluster/switchover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ impl SwitchOverRequest {
#[derive(Debug, Clone)]
pub(crate) struct SwitchOverEngine {
etcd: EtcdStore,
fast_requeue: Option<humantime::Duration>,
nodes: NodeList,
channel: UnboundedSender<SwitchOverRequest>,
}
Expand All @@ -471,13 +472,18 @@ enum ReQueue {

impl SwitchOverEngine {
/// Creates a new switchover engine to process Nvme path failures.
pub(crate) fn new(etcd: EtcdStore, nodes: NodeList) -> Self {
pub(crate) fn new(
etcd: EtcdStore,
fast_requeue: Option<humantime::Duration>,
nodes: NodeList,
) -> Self {
let (rq_tx, rq_rx) = unbounded_channel();

let sw = SwitchOverEngine {
channel: rq_tx,
etcd,
nodes,
fast_requeue,
};

sw.init_worker(Arc::new(Mutex::new(rq_rx)));
Expand Down Expand Up @@ -555,10 +561,14 @@ impl SwitchOverEngine {
/// Sends Switchover request to the channel after sleeping for sometime (if necessary).
pub(crate) fn enqueue(&self, req: SwitchOverRequest) {
let tx_clone = self.channel.clone();
let fast_requeue = self.fast_requeue;
tokio::spawn(async move {
let errored_request = req.retry_count > 0;
let retry_delay = if req.retry_count < ReQueue::NumFast as u64 {
ReQueue::Fast as u64
match fast_requeue {
None => ReQueue::Fast as u64,
Some(duration) => duration.as_secs(),
}
} else {
ReQueue::Slow as u64
};
Expand Down
8 changes: 6 additions & 2 deletions control-plane/agents/src/bin/ha/cluster/volume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ pub(crate) struct VolumeMover {

impl VolumeMover {
/// Create a new `Self`.
pub(crate) fn new(etcd: EtcdStore, nodes: NodeList) -> Self {
let engine = SwitchOverEngine::new(etcd.clone(), nodes);
pub(crate) fn new(
etcd: EtcdStore,
fast_requeue: Option<humantime::Duration>,
nodes: NodeList,
) -> Self {
let engine = SwitchOverEngine::new(etcd.clone(), fast_requeue, nodes);
Self { engine, etcd }
}

Expand Down
4 changes: 4 additions & 0 deletions deployer/src/infra/agents/ha/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ impl ComponentAction for HaClusterAgent {
)
.with_portmap("11500", "11500");

if let Some(period) = options.cluster_fast_requeue {
spec = spec.with_args(vec!["--fast-requeue", period.to_string().as_str()]);
}

let etcd = format!("etcd.{}:2379", options.cluster_label.name());
spec = spec.with_args(vec!["--store", &etcd]);

Expand Down
4 changes: 4 additions & 0 deletions deployer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ pub struct StartOptions {
#[clap(long)]
pub node_conn_timeout: Option<humantime::Duration>,

/// If set, configures the agent-ha-cluster fast requeue period to this duration.
#[clap(long)]
cluster_fast_requeue: Option<humantime::Duration>,

/// Don't use minimum timeouts for specific requests.
#[clap(long)]
no_min_timeouts: bool,
Expand Down
14 changes: 13 additions & 1 deletion scripts/python/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,19 @@ SCRIPT_DIR="$(dirname "$0")"
export ROOT_DIR="$SCRIPT_DIR/../.."

cleanup_handler() {
"$ROOT_DIR"/scripts/python/test_residue_cleanup.sh
sudo nvme disconnect-all || true

"$ROOT_DIR"/scripts/python/test_residue_cleanup.sh || true
"$ROOT_DIR"/target/debug/deployer stop || true

for c in $(docker ps -a --filter "label=io.composer.test.name" --format '{{.ID}}') ; do
docker kill "$c" || true
docker rm -v "$c" || true
done

for n in $(docker network ls --filter "label=io.composer.test.name" --format '{{.ID}}') ; do
docker network rm "$n" || sudo systemctl restart docker && docker network rm "$n"
done
}

# FAST mode to avoid rebuilding certain dependencies
Expand Down
7 changes: 5 additions & 2 deletions scripts/rust/test.sh
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
#!/usr/bin/env bash

cleanup_handler() {
sudo nvme disconnect-all || true
"$ROOT_DIR"/target/debug/deployer stop || true

for c in $(docker ps -a --filter "label=io.composer.test.name" --format '{{.ID}}') ; do
docker kill "$c" || true
docker rm -v "$c" || true
done

for n in $(docker network ls --filter "label=io.composer.test.name" --format '{{.ID}}') ; do
docker network rm "$n" || true
docker network rm "$n" || sudo systemctl restart docker && docker network rm "$n"
done
}

trap cleanup_handler ERR INT QUIT TERM HUP
sudo nvme disconnect-all
cleanup_handler

set -euxo pipefail
# test dependencies
Expand Down
5 changes: 5 additions & 0 deletions tests/bdd/common/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class StartOptions:
max_rebuilds: str = ""
ha_node_agent: bool = False
ha_cluster_agent: bool = False
ha_cluster_agent_fast: str = None
fio_spdk: bool = False
io_engine_coreisol: bool = False
io_engine_devices: [str] = ()
Expand Down Expand Up @@ -73,6 +74,8 @@ def args(self):
agent_arg += ",HaNode"
if self.ha_cluster_agent:
agent_arg += ",HaCluster"
if self.ha_cluster_agent_fast is not None:
args.append(f"--cluster-fast-requeue={self.ha_cluster_agent_fast}")
args.append(agent_arg)

return args
Expand All @@ -96,6 +99,7 @@ def start(
jaeger=True,
max_rebuilds="",
cluster_agent=False,
cluster_agent_fast=None,
node_agent=False,
fio_spdk=False,
io_engine_coreisol=False,
Expand All @@ -117,6 +121,7 @@ def start(
max_rebuilds=max_rebuilds,
ha_node_agent=node_agent,
ha_cluster_agent=cluster_agent,
ha_cluster_agent_fast=cluster_agent_fast,
fio_spdk=fio_spdk,
io_engine_coreisol=io_engine_coreisol,
io_engine_devices=io_engine_devices,
Expand Down
4 changes: 4 additions & 0 deletions tests/bdd/common/etcd.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ def get_nexus_info(self, volume_id, nexus_id):
# Getting the entry returns a tuple of the value and metadata.
# Return the NexusInfo value only.
return self.client.get(key)[0]

def del_switchover(self, volume_id):
key = "{}/SwitchOver/{}".format(self.__ns_key(), volume_id)
self.client.delete(key)
68 changes: 67 additions & 1 deletion tests/bdd/common/operations.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from retrying import retry

import openapi.exceptions
from common.apiclient import ApiClient
from common.docker import Docker
from openapi.exceptions import NotFoundException
from openapi.model.node_status import NodeStatus


class Pool(object):
Expand All @@ -10,23 +15,84 @@ def __pools_api():
# Delete all the pools in the cluster
@staticmethod
def delete_all():
all_deleted = True
for pool in Pool.__pools_api().get_pools():
try:
Pool.__pools_api().del_pool(pool.id)
except NotFoundException:
pass
except openapi.exceptions.ApiException:
all_deleted = False
pass
return all_deleted


class Volume(object):
@staticmethod
def __api():
return ApiClient.volumes_api()

# Delete all the pools in the cluster
# Delete all the volumes in the cluster
@staticmethod
def delete_all():
for volume in Volume.__api().get_volumes().entries:
try:
Volume.__api().del_volume(volume.spec.uuid)
except NotFoundException:
pass


class Snapshot(object):
@staticmethod
def __api():
return ApiClient.snapshots_api()

# Delete all the snapshots in the cluster
@staticmethod
def delete_all():
for snapshot in Snapshot.__api().get_volumes_snapshots().entries:
try:
Snapshot.__api().del_snapshot(snapshot.spec.uuid)
except NotFoundException:
pass


class Cluster(object):
# Cleanup the cluster in preparation for another test scenario
@staticmethod
def cleanup(snapshots=True, volumes=True, pools=True, waitPools=True):
# ensure core agent is up
wait_core_online()

# ensure nodes are all online
for node in ApiClient.nodes_api().get_nodes():
if node.state.status != NodeStatus("Online"):
Docker.restart_container(node.id)
for node in ApiClient.nodes_api().get_nodes():
wait_node_online(node.id)

# ensure snapshots are all delete
if snapshots:
Snapshot.delete_all()
# ensure volumes are all deleted
if volumes:
Volume.delete_all()
# ensure pools are all deleted
if pools:
if waitPools and not Pool.delete_all():
wait_pools_deleted()


@retry(wait_fixed=10, stop_max_attempt_number=200)
def wait_node_online(node_id):
assert ApiClient.nodes_api().get_node(node_id).state.status == NodeStatus("Online")


@retry(wait_fixed=10, stop_max_attempt_number=100)
def wait_core_online():
assert ApiClient.specs_api().get_specs()


@retry(wait_fixed=100, stop_max_attempt_number=100)
def wait_pools_deleted():
assert Pool.delete_all()
2 changes: 1 addition & 1 deletion tests/bdd/features/ha/cluster-agent/test_cluster_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def cluster_agent_rpc_handle():

@pytest.fixture(scope="module")
def setup():
Deployer.start(1, cluster_agent=True)
Deployer.start(0, cluster_agent=True)
yield
Deployer.stop()

Expand Down
52 changes: 41 additions & 11 deletions tests/bdd/features/ha/core-agent/test_target_switchover.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from common.apiclient import ApiClient
from common.deployer import Deployer
from common.fio import Fio
from common.operations import Cluster

from openapi.model.create_pool_body import CreatePoolBody
from openapi.model.create_volume_body import CreateVolumeBody
Expand All @@ -38,6 +39,41 @@
RULE_REMOVE = "sudo iptables -t filter -D OUTPUT -o {} -d {} -p tcp --dport {} -j DROP -m comment --comment 'added by bdd test'"


@pytest.fixture
def tmp_files():
files = []
for index in range(0, 2):
files.append(f"/tmp/disk_{index}")
yield files


@pytest.fixture
def disks(tmp_files):
for disk in tmp_files:
if os.path.exists(disk):
os.remove(disk)
with open(disk, "w") as file:
file.truncate(100 * 1024 * 1024)
# /tmp is mapped into /host/tmp within the io-engine containers
yield list(map(lambda file: f"/host{file}", tmp_files))
for disk in tmp_files:
if os.path.exists(disk):
os.remove(disk)


@pytest.fixture(scope="module")
def init():
Deployer.start(
io_engines=3,
reconcile_period="300ms",
cache_period="200ms",
io_engine_coreisol=True,
fio_spdk=True,
)
yield
Deployer.stop()


@scenario(
"target_switchover.feature",
"R/W access to older target should be restricted after switchover",
Expand Down Expand Up @@ -84,24 +120,18 @@ def test_node_offline_should_not_fail_the_switchover():


@given("a control plane, two Io-Engine instances, two pools")
def a_control_plane_two_ioengine_instances_two_pools():
def a_control_plane_two_ioengine_instances_two_pools(init, disks):
"""a control plane, two Io-Engine instances, two pools."""
Deployer.start(
io_engines=3,
reconcile_period="10s",
io_engine_coreisol=True,
fio_spdk=True,
)
pytest.reuse_existing = False
ApiClient.pools_api().put_node_pool(
NODE_NAME_1, POOL_UUID_1, CreatePoolBody(["malloc:///disk1?size_mb=200"])
NODE_NAME_1, POOL_UUID_1, CreatePoolBody([disks[0]])
)
ApiClient.pools_api().put_node_pool(
NODE_NAME_2, POOL_UUID_2, CreatePoolBody(["malloc:///disk2?size_mb=200"])
NODE_NAME_2, POOL_UUID_2, CreatePoolBody([disks[1]])
)
pytest.reuse_existing = False
yield
cleanup_iptable_rules(IO_ENGINE_1_IP)
Deployer.stop()
Cluster.cleanup()


@given("a published volume with two replicas")
Expand Down
Loading

0 comments on commit 85a1faf

Please sign in to comment.