Skip to content

Commit

Permalink
chore: post review
Browse files Browse the repository at this point in the history
  • Loading branch information
Gu1nness committed Oct 11, 2024
1 parent c34afe5 commit 84449c7
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 33 deletions.
52 changes: 31 additions & 21 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ def _configure_container(self, container: Container) -> None:

try:
self.__filesystem_handler(container)
except FailedToElectNewPrimaryError as err:
except FailedToUpdateFilesystem as err:
raise ContainerNotReadyError from err

self.__configure_layers(container)
Expand Down Expand Up @@ -854,31 +854,32 @@ def _reconcile_mongo_hosts_and_users(self, event: RelationEvent) -> None:
logger.info("Deferring reconfigure: error=%r", e)
event.defer()

def _on_stop(self, event) -> None:
def __handle_partition_on_stop(self) -> None:
"""Raise partition to prevent other units from restarting if an upgrade is in progress.
If an upgrade is not in progress, the leader unit will reset the partition to 0.
"""
current_unit_number = unit_number(self.unit)
# Raise partition to prevent other units from restarting if an upgrade is in progress.
# If an upgrade is not in progress, the leader unit will reset the partition to 0.
if k8s_upgrade.partition.get(app_name=self.app.name) < current_unit_number:
k8s_upgrade.partition.set(app_name=self.app.name, value=current_unit_number)
logger.debug(f"Partition set to {current_unit_number} during stop event")

if self.unit_departed:
logger.debug(f"{self.unit.name} blocking on_stop")
is_in_replica_set = True
timeout = UNIT_REMOVAL_TIMEOUT
while is_in_replica_set and timeout > 0:
is_in_replica_set = self.is_unit_in_replica_set()
time.sleep(1)
timeout -= 1
if timeout < 0:
raise Exception(f"{self.unit.name}.on_stop timeout exceeded")
logger.debug("{self.unit.name} releasing on_stop")
self.unit_departed = False

if not self.upgrade._upgrade:
logger.debug("Peer relation missing during stop event")
return

def __handle_relation_departed_on_stop(self):
"""Leave replicaset."""
logger.debug(f"{self.unit.name} blocking on_stop")
is_in_replica_set = True
timeout = UNIT_REMOVAL_TIMEOUT
while is_in_replica_set and timeout > 0:
is_in_replica_set = self.is_unit_in_replica_set()
time.sleep(1)
timeout -= 1
if timeout < 0:
raise Exception(f"{self.unit.name}.on_stop timeout exceeded")
logger.debug("{self.unit.name} releasing on_stop")
self.unit_departed = False

def __handle_upgrade_on_stop(self):
"""Sets the unit state to RESTARTING and step down from replicaset."""
self.upgrade._upgrade.unit_state = UnitState.RESTARTING

# According to the MongoDB documentation, before upgrading the primary, we must ensure a
Expand All @@ -891,6 +892,15 @@ def _on_stop(self, event) -> None:
logger.error("Failed to reelect primary before upgrading unit.")
return

def _on_stop(self, event) -> None:
self.__handle_partition_on_stop()
if self.unit_departed:
self.__handle_relation_departed_on_stop()
if not self.upgrade._upgrade:
logger.debug("Peer relation missing during stop event")
return
self.__handle_upgrade_on_stop()

def _on_update_status(self, event: UpdateStatusEvent):
# user-made mistakes might result in other incorrect statues. Prioritise informing users of
# their mistake.
Expand Down
25 changes: 23 additions & 2 deletions src/k8s_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,22 @@
RESUME_ACTION_NAME,
ROLLBACK_INSTRUCTIONS,
AbstractUpgrade,
FailedToElectNewPrimaryError,
GenericMongoDBUpgrade,
PeerRelationNotReady,
PrecheckFailed,
UnitState,
unit_number,
)
from charms.mongodb.v1.mongodb import MongoDBConnection
from charms.mongodb.v1.mongos import BalancerNotEnabledError, MongosConnection
from lightkube.core.exceptions import ApiError
from ops import ActiveStatus, StatusBase
from ops.charm import ActionEvent
from ops.framework import EventBase, EventSource
from ops.model import BlockedStatus, Unit
from overrides import override
from tenacity import RetryError
from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed

from config import Config

Expand Down Expand Up @@ -189,7 +191,7 @@ def _determine_partition(
`force` and the state, we decide the new value of the partition.
A specific case:
* If we don't have action event and the upgrade_order_index is 1, we
return because it means we're waiting for the resume-upgrade/force-upgrade event to run.
return because it means we're waiting for the resume-refresh/force-refresh event to run.
"""
if not self.in_progress:
return 0
Expand Down Expand Up @@ -374,6 +376,24 @@ def _on_resume_upgrade_action(self, event: ActionEvent) -> None:
return
self._upgrade.reconcile_partition(action_event=event)

def step_down_primary_and_wait_reelection(self) -> None:
"""Steps down the current primary and waits for a new one to be elected."""
if len(self.charm.mongodb_config.hosts) < 2:
logger.warning(
"No secondaries to become primary - upgrading primary without electing a new one, expect downtime."
)
return

old_primary = self.charm.primary
with MongoDBConnection(self.charm.mongodb_config) as mongod:
mongod.step_down_primary()

for attempt in Retrying(stop=stop_after_attempt(15), wait=wait_fixed(1), reraise=True):
with attempt:
new_primary = self.charm.primary
if new_primary == old_primary:
raise FailedToElectNewPrimaryError()

def run_post_app_upgrade_task(self, event: EventBase):
"""Runs the post upgrade check to verify that the cluster is healthy.
Expand All @@ -396,6 +416,7 @@ def run_post_app_upgrade_task(self, event: EventBase):
# upgrade.
if not self.charm.unit.is_leader() or not self.charm.is_role(Config.Role.CONFIG_SERVER):
logger.debug("Post refresh check is completed.")
self.charm.status.process_statuses()
return

self.charm.upgrade.post_cluster_upgrade_event.emit()
Expand Down
11 changes: 5 additions & 6 deletions tests/integration/upgrades/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ async def assert_successful_run_upgrade_sequence(
) -> None:
"""Runs the upgrade sequence on a given app."""
leader_unit = await backup_helpers.get_leader_unit(ops_test, app_name)
# action = await leader_unit.run_action("pre-refresh-check")
# await action.wait()
# assert action.status == "completed", "pre-refresh-check failed, expected to succeed."
action = await leader_unit.run_action("pre-refresh-check")
await action.wait()
assert action.status == "completed", "pre-refresh-check failed, expected to succeed."

logger.info(f"Upgrading {app_name}")

await ops_test.model.applications[app_name].refresh(path=new_charm)
Expand All @@ -39,9 +40,7 @@ async def assert_successful_run_upgrade_sequence(
await action.wait()
assert action.status == "completed", "resume-refresh failed, expected to succeed."

await ops_test.model.wait_for_idle(
apps=[app_name], status="active", timeout=1000, idle_period=30
)
await ops_test.model.wait_for_idle(apps=[app_name], timeout=1000, idle_period=30)


async def get_workload_version(ops_test: OpsTest, unit_name: str) -> str:
Expand Down
91 changes: 87 additions & 4 deletions tests/integration/upgrades/test_local_sharding_upgrades.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,64 @@
from pytest_operator.plugin import OpsTest

from ..backup_tests.helpers import get_leader_unit
from ..ha_tests.helpers import deploy_and_scale_application, get_direct_mongo_client
from ..helpers import MONGOS_PORT, mongodb_uri
from ..sharding_tests import writes_helpers
from ..sharding_tests.helpers import deploy_cluster_components, integrate_cluster
from .helpers import assert_successful_run_upgrade_sequence, get_workload_version

SHARD_ONE_DB_NAME = "shard_one_db"
SHARD_TWO_DB_NAME = "shard_two_db"
SHARD_ONE_COLL_NAME = "test_collection"
SHARD_TWO_COLL_NAME = "test_collection"
SHARD_ONE_APP_NAME = "shard-one"
SHARD_TWO_APP_NAME = "shard-two"
CONFIG_SERVER_APP_NAME = "config-server"
CLUSTER_COMPONENTS = [SHARD_ONE_APP_NAME, SHARD_TWO_APP_NAME, CONFIG_SERVER_APP_NAME]
SHARD_APPS = [SHARD_ONE_APP_NAME, SHARD_TWO_APP_NAME]
WRITE_APP = "application"

TIMEOUT = 15 * 60


@pytest.fixture()
async def add_writes_to_shards(ops_test: OpsTest):
"""Adds writes to each shard before test starts and clears writes at the end of the test."""
application_unit = ops_test.model.applications[WRITE_APP].units[0]

start_writes_action = await application_unit.run_action(
"start-continuous-writes",
**{"db-name": SHARD_ONE_DB_NAME, "coll-name": SHARD_ONE_COLL_NAME},
)
await start_writes_action.wait()

start_writes_action = await application_unit.run_action(
"start-continuous-writes",
**{"db-name": SHARD_TWO_DB_NAME, "coll-name": SHARD_TWO_COLL_NAME},
)
await start_writes_action.wait()

# # move continuous writes so they are present on each shard
mongos_client = await get_direct_mongo_client(
ops_test, app_name=CONFIG_SERVER_APP_NAME, mongos=True
)
mongos_client.admin.command("movePrimary", SHARD_ONE_DB_NAME, to=SHARD_ONE_APP_NAME)
mongos_client.admin.command("movePrimary", SHARD_TWO_DB_NAME, to=SHARD_TWO_APP_NAME)

yield
clear_writes_action = await application_unit.run_action(
"clear-continuous-writes",
**{"db-name": SHARD_ONE_DB_NAME, "coll-name": SHARD_ONE_COLL_NAME},
)
await clear_writes_action.wait()

clear_writes_action = await application_unit.run_action(
"clear-continuous-writes",
**{"db-name": SHARD_TWO_DB_NAME, "coll-name": SHARD_TWO_APP_NAME},
)
await clear_writes_action.wait()


@pytest_asyncio.fixture(scope="module")
async def local_charm(ops_test: OpsTest) -> AsyncGenerator[Path]:
"""Builds the regular charm."""
Expand Down Expand Up @@ -49,10 +96,11 @@ def righty_upgrade_charm(local_charm, tmp_path: Path):
@pytest.mark.abort_on_fail
async def test_build_and_deploy(ops_test: OpsTest) -> None:
"""Build deploy, and integrate, a sharded cluster."""
await deploy_and_scale_application(ops_test)
num_units_cluster_config = {
CONFIG_SERVER_APP_NAME: 1,
SHARD_ONE_APP_NAME: 1,
SHARD_TWO_APP_NAME: 1,
CONFIG_SERVER_APP_NAME: 3,
SHARD_ONE_APP_NAME: 3,
SHARD_TWO_APP_NAME: 3,
}
await deploy_cluster_components(
ops_test,
Expand All @@ -72,6 +120,9 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None:
idle_period=20,
timeout=TIMEOUT,
)
# configure write app to use mongos uri
mongos_uri = await mongodb_uri(ops_test, app_name=CONFIG_SERVER_APP_NAME, port=MONGOS_PORT)
await ops_test.model.applications[WRITE_APP].set_config({"mongos-uri": mongos_uri})


@pytest.mark.group(1)
Expand All @@ -87,7 +138,7 @@ async def test_pre_upgrade_check_success(ops_test: OpsTest) -> None:

@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_upgrade_config_server(ops_test: OpsTest, righty_upgrade_charm):
async def test_upgrade_cluster(ops_test: OpsTest, righty_upgrade_charm, add_writes_to_shards):
initial_version = Path("workload_version").read_text().strip()
[major, minor, patch] = initial_version.split(".")
new_version = f"{major}.{int(minor)+1}.{patch}+testupgrade"
Expand All @@ -98,6 +149,38 @@ async def test_upgrade_config_server(ops_test: OpsTest, righty_upgrade_charm):
)

await ops_test.model.wait_for_idle(apps=CLUSTER_COMPONENTS, status="active", idle_period=30)

application_unit = ops_test.model.applications[WRITE_APP].units[0]
stop_writes_action = await application_unit.run_action(
"stop-continuous-writes",
**{"db-name": SHARD_ONE_DB_NAME, "coll-name": SHARD_ONE_COLL_NAME},
)
await stop_writes_action.wait()
shard_one_expected_writes = int(stop_writes_action.results["writes"])
stop_writes_action = await application_unit.run_action(
"stop-continuous-writes",
**{"db-name": SHARD_TWO_DB_NAME, "coll-name": SHARD_TWO_COLL_NAME},
)
await stop_writes_action.wait()
shard_two_total_expected_writes = int(stop_writes_action.results["writes"])

actual_shard_one_writes = await writes_helpers.count_shard_writes(
ops_test,
config_server_name=CONFIG_SERVER_APP_NAME,
db_name=SHARD_ONE_DB_NAME,
)
actual_shard_two_writes = await writes_helpers.count_shard_writes(
ops_test,
config_server_name=CONFIG_SERVER_APP_NAME,
db_name=SHARD_TWO_DB_NAME,
)

assert (
actual_shard_one_writes == shard_one_expected_writes
), "missed writes during upgrade procedure."
assert (
actual_shard_two_writes == shard_two_total_expected_writes
), "missed writes during upgrade procedure."
for sharding_component in CLUSTER_COMPONENTS:
for unit in ops_test.model.applications[sharding_component].units:
workload_version = await get_workload_version(ops_test, unit.name)
Expand Down

0 comments on commit 84449c7

Please sign in to comment.