Skip to content

Commit

Permalink
[DPE-4876][DPE-4943][DPE-4944] Add/Remove shards + fix error in sec…
Browse files Browse the repository at this point in the history
…ret-changed hook (#273)

* update libs

* update tests for the new form of the monitor usr URI

* legacy provider is not needed for k8s charm

* sharding components succesful start with expected args

* format + lint

* add a basic integration test

* update integration tests to work on K8s

* PR feedback

* add in flags from vm charm to test

* pr feedback

* bring libs up to date

* make corresponding changes in src code

* test components go into blocked state if no relation is present

* k8s-afy copied over test code

* use updated lib versions

* WIP adding shards to cluster

* add shard works

* test add shard

* Add remove shard tests

* add in remaining basic shard tests

* Apply suggestions from Mehdi's code review

Co-authored-by: Mehdi Bendriss <[email protected]>

* Update tests/integration/sharding_tests/helpers.py

Co-authored-by: Mehdi Bendriss <[email protected]>

* not necessary to open ports on K8s

* fix ha tests with correct call to get password

* revert change as pymongo does not serialised collection

---------

Co-authored-by: Mehdi Bendriss <[email protected]>
  • Loading branch information
MiaAltieri and Mehdi-Bendriss authored Jul 31, 2024
1 parent aed46b7 commit c636014
Show file tree
Hide file tree
Showing 6 changed files with 441 additions and 34 deletions.
119 changes: 96 additions & 23 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ def _peers(self) -> Optional[Relation]:
"""
return self.model.get_relation(Config.Relations.PEERS)

@property
def peers_units(self) -> list[Unit]:
"""Get peers units in a safe way."""
if not self._peers:
return []
else:
return self._peers.units

@property
def mongodb_config(self) -> MongoDBConfiguration:
"""Create a configuration object with settings.
Expand Down Expand Up @@ -344,12 +352,18 @@ def is_role(self, role_name: str) -> bool:
return self.role == role_name

def has_config_server(self) -> bool:
"""TODO: Implement this function as part of sharding."""
return False
"""Returns True if we have a config-server."""
return self.get_config_server_name() is not None

def get_config_server_name(self) -> None:
"""TODO: Implement this function as part of sharding."""
return None
def get_config_server_name(self) -> str | None:
"""Returns the name of the Juju Application that the shard is using as a config server."""
if not self.is_role(Config.Role.SHARD):
logger.info(
"Component %s is not a shard, cannot be integrated to a config-server.", self.role
)
return None

return self.shard.get_config_server_name()

@db_initialised.setter
def db_initialised(self, value):
Expand Down Expand Up @@ -430,9 +444,43 @@ def drained(self) -> bool:

return self.unit_peer_data.get("drained", False)

@property
def primary(self) -> str | None:
"""Retrieves the unit with the primary replica."""
try:
with MongoDBConnection(self.mongodb_config) as mongo:
primary_ip = mongo.primary()
except PyMongoError as e:
logger.error("Unable to access primary due to: %s", e)
return None

# check if current unit matches primary ip
if primary_ip == self.unit_host(self.unit):
return self.unit.name

# check if peer unit matches primary ip
for unit in self.peers_units:
if primary_ip == self.unit_host(unit):
return unit.name

return None

# END: properties

# BEGIN: generic helper methods
def remote_mongos_config(self, hosts) -> MongosConfiguration:
"""Generates a MongosConfiguration object for mongos in the deployment of MongoDB."""
# mongos that are part of the cluster have the same username and password, but different
# hosts
return self._get_mongos_config_for_user(OperatorUser, hosts)

def remote_mongodb_config(self, hosts, replset=None, standalone=None) -> MongoDBConfiguration:
"""Generates a MongoDBConfiguration object for mongod in the deployment of MongoDB."""
# mongos that are part of the cluster have the same username and password, but different
# hosts
return self._get_mongodb_config_for_user(
OperatorUser, hosts, replset=replset, standalone=standalone
)

def _scope_opj(self, scope: Scopes):
if scope == APP_SCOPE:
Expand Down Expand Up @@ -994,6 +1042,20 @@ def _check_or_set_keyfile(self) -> None:
def _generate_keyfile(self) -> None:
self.set_secret(APP_SCOPE, "keyfile", generate_keyfile())

def get_keyfile_contents(self) -> str:
"""Retrieves the contents of the keyfile on host machine."""
# wait for keyFile to be created by leader unit
if not self.get_secret(APP_SCOPE, Config.Secrets.SECRET_KEYFILE_NAME):
logger.debug("waiting for leader unit to generate keyfile contents")

try:
container = self.unit.get_container(Config.CONTAINER_NAME)
key = container.pull(f"{Config.MONGOD_CONF_DIR}/{Config.TLS.KEY_FILE_NAME}")
return key.read()
except PathError:
logger.info("no keyfile present")
return

def _generate_secrets(self) -> None:
"""Generate passwords and put them into peer relation.
Expand Down Expand Up @@ -1126,8 +1188,11 @@ def _push_keyfile_to_workload(self, container: Container) -> None:
if not keyfile:
raise MissingSecretError(f"No secret defined for {APP_SCOPE}, keyfile")
else:
self.push_file_to_container(
container, Config.CONF_DIR, Config.TLS.KEY_FILE_NAME, keyfile
self.push_file_to_unit(
container=container,
parent_dir=Config.MONGOD_CONF_DIR,
file_name=Config.TLS.KEY_FILE_NAME,
file_contents=keyfile,
)

def push_tls_certificate_to_workload(self) -> None:
Expand All @@ -1139,48 +1204,49 @@ def push_tls_certificate_to_workload(self) -> None:

if external_ca is not None:
logger.debug("Uploading external ca to workload container")
self.push_file_to_container(
self.push_file_to_unit(
container=container,
parent_dir=Config.CONF_DIR,
parent_dir=Config.MONGOD_CONF_DIR,
file_name=Config.TLS.EXT_CA_FILE,
source=external_ca,
file_contents=external_ca,
)
if external_pem is not None:
logger.debug("Uploading external pem to workload container")
self.push_file_to_container(
self.push_file_to_unit(
container=container,
parent_dir=Config.CONF_DIR,
parent_dir=Config.MONGOD_CONF_DIR,
file_name=Config.TLS.EXT_PEM_FILE,
source=external_pem,
file_contents=external_pem,
)

# Handling of external CA and PEM files
internal_ca, internal_pem = self.tls.get_tls_files(internal=True)

if internal_ca is not None:
logger.debug("Uploading internal ca to workload container")
self.push_file_to_container(
self.push_file_to_unit(
container=container,
parent_dir=Config.CONF_DIR,
parent_dir=Config.MONGOD_CONF_DIR,
file_name=Config.TLS.INT_CA_FILE,
source=internal_ca,
file_contents=internal_ca,
)
if internal_pem is not None:
logger.debug("Uploading internal pem to workload container")
self.push_file_to_container(
self.push_file_to_unit(
container=container,
parent_dir=Config.CONF_DIR,
parent_dir=Config.MONGOD_CONF_DIR,
file_name=Config.TLS.INT_PEM_FILE,
source=internal_pem,
file_contents=internal_pem,
)

def push_file_to_container(
self, container: Container, parent_dir: str, file_name: str, source: str
def push_file_to_unit(
self, parent_dir: str, file_name: str, file_contents: str, container: Container = None
) -> None:
"""Push the file on the container, with the right permissions."""
container = container or self.unit.get_container(Config.CONTAINER_NAME)
container.push(
f"{parent_dir}/{file_name}",
source,
file_contents,
make_dirs=True,
permissions=0o400,
user=Config.UNIX_USER,
Expand All @@ -1198,7 +1264,7 @@ def delete_tls_certificate_from_workload(self) -> None:
Config.TLS.INT_PEM_FILE,
]:
try:
container.remove_path(f"{Config.CONF_DIR}/{file}")
container.remove_path(f"{Config.MONGOD_CONF_DIR}/{file}")
except PathError as err:
logger.debug("Path unavailable: %s (%s)", file, str(err))

Expand Down Expand Up @@ -1256,6 +1322,13 @@ def _connect_pbm_agent(self) -> None:
if not self.db_initialised:
return

# pbm is not functional in shards without a config-server
if self.is_role(Config.Role.SHARD) and not (
self.has_config_server() and self.shard._is_added_to_cluster()
):
logger.debug("Cannot start pbm on shard until shard is added to cluster.")
return

# must wait for leader to set URI before any attempts to update are made
if not self.get_secret(APP_SCOPE, BackupUser.get_password_key_name()):
logger.debug("_connect_pbm_agent: can't get backup user password!. Returning.")
Expand Down
15 changes: 14 additions & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from typing import List, Literal

from ops.model import BlockedStatus


class Config:
"""Configuration for MongoDB Charm."""
Expand All @@ -17,7 +19,7 @@ class Config:
DATA_DIR = "/var/lib/mongodb"

LOG_DIR = "/var/log/mongodb"
CONF_DIR = "/etc/mongod"
MONGOD_CONF_DIR = "/etc/mongod"
MONGODB_LOG_FILENAME = "mongodb.log"

LICENSE_PATH = "/licenses/LICENSE"
Expand Down Expand Up @@ -123,6 +125,17 @@ class Secrets:
SECRET_KEYFILE_NAME = "keyfile"
MAX_PASSWORD_LENGTH = 4096

class Status:
"""Status related constants.
TODO: move all status messages here.
"""

STATUS_READY_FOR_UPGRADE = "status-shows-ready-for-upgrade"

# TODO Future PR add more status messages here as constants
UNHEALTHY_UPGRADE = BlockedStatus("Unhealthy after upgrade.")

@staticmethod
def get_license_path(license_name: str) -> str:
"""Return the path to the license file."""
Expand Down
15 changes: 13 additions & 2 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

from ..helpers import (
APP_NAME,
MONGOD_PORT,
MONGOS_PORT,
get_app_name,
get_mongo_cmd,
get_password,
Expand All @@ -51,6 +53,7 @@
ANOTHER_DATABASE_APP_NAME = "another-database"
EXCLUDED_APPS = [ANOTHER_DATABASE_APP_NAME]


logger = logging.getLogger(__name__)

mongodb_charm, application_charm = None, None
Expand Down Expand Up @@ -378,16 +381,24 @@ async def get_direct_mongo_client(


async def get_mongo_client(
ops_test: OpsTest, excluded: List[str] = [], use_subprocess_to_get_password=False
ops_test: OpsTest,
excluded: List[str] = [],
app_name: str = None,
use_subprocess_to_get_password=False,
mongos: bool = False,
) -> MongoClient:
"""Returns a direct mongodb client potentially passing over some of the units."""
mongodb_name = await get_application_name(ops_test, APP_NAME)
mongodb_name = app_name or await get_application_name(ops_test, APP_NAME)
port = MONGOS_PORT if mongos else MONGOD_PORT

for unit in ops_test.model.applications[mongodb_name].units:
if unit.name not in excluded and unit.workload_status == "active":
url = await mongodb_uri(
ops_test,
[int(unit.name.split("/")[1])],
use_subprocess_to_get_password=use_subprocess_to_get_password,
app_name=mongodb_name,
port=port,
)
return MongoClient(url, directConnection=True)
assert False, "No fitting unit could be found"
Expand Down
23 changes: 16 additions & 7 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
METADATA = yaml.safe_load(Path("./metadata.yaml").read_text())
APP_NAME = METADATA["name"]
UNIT_IDS = [0, 1, 2]
MONGOS_PORT = 27018
MONGOD_PORT = 27017

TEST_DOCUMENTS = """[
{
Expand Down Expand Up @@ -152,17 +154,22 @@ async def get_mongo_cmd(ops_test: OpsTest, unit_name: str):


async def mongodb_uri(
ops_test: OpsTest, unit_ids: List[int] | None = None, use_subprocess_to_get_password=False
ops_test: OpsTest,
unit_ids: List[int] | None = None,
use_subprocess_to_get_password=False,
port=MONGOD_PORT,
app_name: str = APP_NAME,
) -> str:
if unit_ids is None:
unit_ids = UNIT_IDS

addresses = [await get_address_of_unit(ops_test, unit_id) for unit_id in unit_ids]
hosts = ",".join(addresses)
addresses = [await get_address_of_unit(ops_test, unit_id, app_name) for unit_id in unit_ids]
hosts = [f"{host}:{port}" for host in addresses]
hosts = ",".join(hosts)
if use_subprocess_to_get_password:
password = get_password_using_subprocess(ops_test)
password = get_password_using_subprocess(ops_test, app_name=app_name)
else:
password = await get_password(ops_test, 0)
password = await get_password(ops_test, 0, app_name=app_name)
return f"mongodb://operator:{password}@{hosts}/admin"


Expand Down Expand Up @@ -488,7 +495,9 @@ def is_pod_ready(namespace, pod_name):
wait=wait_fixed(30),
reraise=True,
)
def get_password_using_subprocess(ops_test: OpsTest, username="operator") -> str:
def get_password_using_subprocess(
ops_test: OpsTest, username="operator", app_name=APP_NAME
) -> str:
"""Use the charm action to retrieve the password from provided unit.
Returns:
Expand All @@ -503,7 +512,7 @@ def get_password_using_subprocess(ops_test: OpsTest, username="operator") -> str
result.stderr,
)
raise Exception(f"Failed to get password: {result.stderr}")
cmd = ["juju", "run", f"{APP_NAME}/leader", "get-password", f"username={username}"]
cmd = ["juju", "run", f"{app_name}/leader", "get-password", f"username={username}"]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
logger.error("get-password command returned non 0 exit code: %s", result.stderr)
Expand Down
54 changes: 54 additions & 0 deletions tests/integration/sharding_tests/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env python3
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.
from typing import List, Optional


def get_cluster_shards(mongos_client) -> set:
"""Returns a set of the shard members."""
shard_list = mongos_client.admin.command("listShards")
curr_members = [member["host"].split("/")[0] for member in shard_list["shards"]]
return set(curr_members)


def has_correct_shards(mongos_client, expected_shards: List[str]) -> bool:
"""Returns true if the cluster config has the expected shards."""
shard_names = get_cluster_shards(mongos_client)
return shard_names == set(expected_shards)


def shard_has_databases(
mongos_client, shard_name: str, expected_databases_on_shard: List[str]
) -> bool:
"""Returns true if the provided shard is a primary for the provided databases."""
databases_on_shard = get_databases_for_shard(mongos_client, shard_name=shard_name)
return set(databases_on_shard) == set(expected_databases_on_shard)


def get_databases_for_shard(mongos_client, shard_name) -> Optional[List[str]]:
"""Returns the databases hosted on the given shard."""
config_db = mongos_client["config"]
if "databases" not in config_db.list_collection_names():
return None

databases_collection = config_db["databases"]

if databases_collection is None:
return

return databases_collection.distinct("_id", {"primary": shard_name})


def write_data_to_mongodb(client, db_name, coll_name, content) -> None:
"""Writes data to the provided collection and database."""
db = client[db_name]
test_collection = db[coll_name]
test_collection.insert_one(content)


def verify_data_mongodb(client, db_name, coll_name, key, value) -> bool:
"""Checks a key/value pair for a provided collection and database."""
db = client[db_name]
test_collection = db[coll_name]
query = test_collection.find({}, {key: 1})
return query[0][key] == value
Loading

0 comments on commit c636014

Please sign in to comment.