From 68ce2084b6ed45ec7ed8633cc482db320f5289b3 Mon Sep 17 00:00:00 2001 From: Oliver Schmidt Date: Mon, 14 Oct 2024 11:40:00 +0200 Subject: [PATCH] s3 management: add user management - accounting now uses the command runner utility code - s3 users are now managed (created, deleted and updated) Re PL-133084 --- ...1114_110006_PL-133084-s3-usermgmt_scriv.md | 18 + nixos/roles/ceph/rgw.nix | 10 +- pkgs/fc/agent/default.nix | 6 + pkgs/fc/agent/fc/manage/s3accounting.py | 47 -- pkgs/fc/agent/fc/manage/s3users.py | 353 +++++++++ .../fc/manage/tests/fakedirectory_s3users.py | 56 ++ pkgs/fc/agent/fc/manage/tests/test_s3users.py | 719 ++++++++++++++++++ pkgs/fc/agent/fc/util/runners.py | 66 +- pkgs/fc/agent/pytest.ini | 2 + pkgs/fc/agent/setup.py | 2 +- pkgs/fc/ceph/src/fc/ceph/util/__init__.py | 3 +- 11 files changed, 1221 insertions(+), 61 deletions(-) create mode 100644 changelog.d/20241114_110006_PL-133084-s3-usermgmt_scriv.md delete mode 100644 pkgs/fc/agent/fc/manage/s3accounting.py create mode 100644 pkgs/fc/agent/fc/manage/s3users.py create mode 100644 pkgs/fc/agent/fc/manage/tests/fakedirectory_s3users.py create mode 100644 pkgs/fc/agent/fc/manage/tests/test_s3users.py create mode 100644 pkgs/fc/agent/pytest.ini diff --git a/changelog.d/20241114_110006_PL-133084-s3-usermgmt_scriv.md b/changelog.d/20241114_110006_PL-133084-s3-usermgmt_scriv.md new file mode 100644 index 000000000..35cfb01e8 --- /dev/null +++ b/changelog.d/20241114_110006_PL-133084-s3-usermgmt_scriv.md @@ -0,0 +1,18 @@ + + + +### NixOS XX.XX platform + +- S3 users are now managed automatically and can be viewed and managed via our + customer portal. (PL-133084) diff --git a/nixos/roles/ceph/rgw.nix b/nixos/roles/ceph/rgw.nix index 4479bc7c0..d7f8fc6ed 100644 --- a/nixos/roles/ceph/rgw.nix +++ b/nixos/roles/ceph/rgw.nix @@ -173,13 +173,13 @@ in ''; }; - systemd.services.fc-ceph-rgw-accounting = rec { - description = "Upload S3 usage data to the Directory"; + systemd.services.fc-ceph-rgw-users = rec { + description = "Sync S3 users and accounting with directory"; path = [ cephPkgs.ceph ]; serviceConfig.Type = "oneshot"; wants = [ fclib.network.sto.addressUnit ]; after = wants; - script = "${pkgs.fc.agent}/bin/fc-s3accounting --enc ${config.flyingcircus.encPath}"; + script = "${pkgs.fc.agent}/bin/fc-s3users --enc ${config.flyingcircus.encPath}"; }; flyingcircus.services.sensu-client.checks = { @@ -223,10 +223,10 @@ in }; }; - systemd.timers.fc-ceph-rgw-accounting = { + systemd.timers.fc-ceph-rgw-users = { enable = ! config.flyingcircus.services.ceph.server.passive; - description = "Timer for uploading S3 usage data to the Directory"; + description = "Timer for syncing S3 users and accounting with the directory"; wantedBy = [ "timers.target" ]; timerConfig = { Persistent = true; diff --git a/pkgs/fc/agent/default.nix b/pkgs/fc/agent/default.nix index 2817e2034..32404e109 100644 --- a/pkgs/fc/agent/default.nix +++ b/pkgs/fc/agent/default.nix @@ -98,4 +98,10 @@ buildPythonPackage rec { checkInputs ++ [ py.pytest ] ++ propagatedBuildInputs ); + outputs = [ "out" "qa" ]; + + postCheck = '' + cp -a htmlcov/ $qa/ + ''; + } diff --git a/pkgs/fc/agent/fc/manage/s3accounting.py b/pkgs/fc/agent/fc/manage/s3accounting.py deleted file mode 100644 index e6c0b2562..000000000 --- a/pkgs/fc/agent/fc/manage/s3accounting.py +++ /dev/null @@ -1,47 +0,0 @@ -"""Uploads usage data from Ceph/RadosGW into the Directory""" - -import argparse -import json -import subprocess - -from fc.util.directory import connect - - -def main(): - parser = argparse.ArgumentParser( - description="Flying Circus S3 usage accounting" - ) - parser.add_argument( - "-E", - "--enc", - default="/etc/nixos/enc.json", - help="Path to enc.json (default: %(default)s)", - ) - - args = parser.parse_args() - with open(args.enc) as f: - enc = json.load(f) - - result = subprocess.run( - ["radosgw-admin", "user", "list"], check=True, capture_output=True - ) - users = json.loads(result.stdout) - - usage = dict() - for user in users: - result = subprocess.run( - ["radosgw-admin", "user", "stats", "--uid", user], - check=True, - capture_output=True, - ) - stats = json.loads(result.stdout) - usage[user] = str(stats["stats"]["total_bytes"]) - - location = enc["parameters"]["location"] - - directory = connect(enc, ring=0) - directory.store_s3(location, usage) - - -if __name__ == "__main__": - main() diff --git a/pkgs/fc/agent/fc/manage/s3users.py b/pkgs/fc/agent/fc/manage/s3users.py new file mode 100644 index 000000000..fbd537976 --- /dev/null +++ b/pkgs/fc/agent/fc/manage/s3users.py @@ -0,0 +1,353 @@ +"""S3 user-oriented actions: + +- update users based on directory data +- report accounting for usage + +""" + +import argparse +import json +import logging +import sys +from dataclasses import dataclass +from subprocess import CalledProcessError + +from fc.util.directory import connect +from fc.util.runners import run + +log = logging.getLogger() + + +def list_radosgw_users() -> list[str]: + """List all uids of users known to the local radosgw""" + return run.json.radosgw_admin("user", "list") + + +def accounting(location: str, dir_conn): + """Uploads usage data from Ceph/RadosGW into the Directory""" + users = list_radosgw_users() + + usage = dict() + for user in users: + try: + stats = run.json.radosgw_admin( + "user", + "stats", + "--uid", + user, + silent_errors=( + lambda code, stdout, stderr: code == 2 + and b"User has not been initialized or user does not exist" + in stderr + ), + ) + usage[user] = str(stats["stats"]["total_bytes"]) + except CalledProcessError as e: + logging.error(f"Could not get user statistics: {e.stderr}") + dir_conn.store_s3(location, usage) + + +class RGWState: + uid: str + display_name: str | None = None + access_key: str | None = None + secret_key: str | None = None + + key_count = 0 + + exists = False + + def __init__(self, uid): + self.uid = uid + + def update(self): + try: + state = run.json.radosgw_admin("user", "info", "--uid", self.uid) + except Exception: + self.exists = False + self.display_name = None + self.key_count = 0 + self.access_key = None + self.secret_key = None + else: + self.exists = True + self.display_name = state["display_name"] + + self.key_count = len(state["keys"]) + + try: + # we silently ignore any additional keys here, logging this case is + # left to an explicit check method. + main_key = state["keys"][0] + except IndexError: + self.access_key = None + self.secret_key = None + else: + self.access_key = main_key["access_key"] + self.secret_key = main_key["secret_key"] + + def ensure_deleted(self): + if not self.exists: + return + try: + # --purge-keys is not really necessary, but still do it + run.radosgw_admin( + "user", "rm", + "--uid", self.uid, + "--purge-data", + "--purge-keys", + ) # fmt: skip + except CalledProcessError as err: + if err.returncode == 2 and self.uid not in list_radosgw_users(): + # potential atomicity problem, but user is gone -> all good + pass + else: + raise + self.update() + + def ensure_exists( + self, display_name: str, access_key: str, secret_key: str | None = None + ): + """Ensures that a radosgw user with the desired properties and keys exists. + + Called upon user creation, as well as when rotating key or information. + + """ + if not self.exists: + if not secret_key: + log.warning( + "user create: no secret key provided, user is created with " + "an autogenerated secret." + ) + run.radosgw_admin( + "user", "create", + "--uid", self.uid, + "--display-name", display_name, + # Security Warning: by passing around the keys as command line + # arguments, we potentially leak them via ps/ proc. This is + # acceptable for now, as ceph hosts are accessible to admins only. + # A preferential alternative would be the ability for `radosgw-admin` + # to read from env variables. There's also the admin RESTful API of + # radosgw, unfortunately that's based on S3 authentication logic. + # Implementing this, e.g. via boto3, is rather complex and not a pleasure. + "--access-key", access_key, + '--gen-secret' + ) # fmt: skip + + # we only modify the keys if we have the secret available + # only modify/ add keys when we have sufficient data: + update_args: list[str] = [] + if self.display_name != display_name: + update_args += ["--display-name", display_name] + if secret_key: + update_args += ["--access-key", access_key] + update_args += ["--secret-key", run.redacted(secret_key)] + if update_args: + run.radosgw_admin( + "user", "modify", + "--uid", self.uid, + *update_args, + ) # fmt: skip + self.update() + + def ensure_no_keys(self): + """Remove _all_ keys to make users aware of the impending hard deletion.""" + keys = run.json.radosgw_admin("user", "info", "--uid", self.uid)["keys"] + for key in keys: + run.radosgw_admin("key", "rm", "--access-key", key["access_key"]) + if keys: + self.update() + + +@dataclass +class DirectoryState: + uid: str + display_name = None + access_key = None + secret_key = None + deletion_stages: list[str] + exists = False + key_count = 1 # We always expect exactly 1 key for now. + + def __init__(self, uid: str): + self.uid = uid + self.deletion_stages = [] + + def update(self, user_dict): + self.exists = True + self.display_name = user_dict["display_name"] + self.access_key = user_dict["access_key"] + self.secret_key = user_dict["secret_key"] + self.deletion_stages = user_dict["deletion"]["stages"] + + +class User: + def __init__(self, uid: str): + self.uid = uid + self.rgw = RGWState(uid) + self.directory = DirectoryState(uid) + + @property + def should_exist(self): + return self.directory.exists and not self.should_be_deleted + + @property + def should_be_deleted(self): + return "hard" in self.directory.deletion_stages + + def ensure(self): + if self.should_exist: + if "soft" in self.directory.deletion_stages: + # Only create without keeping the keys up to date to avoid + # stupid key creation/deletion cycles. + self.rgw.ensure_exists( + self.directory.display_name, self.directory.access_key + ) + self.rgw.ensure_no_keys() + else: + self.rgw.ensure_exists( + self.directory.display_name, + self.directory.access_key, + self.directory.secret_key, + ) + elif self.should_be_deleted: + self.rgw.ensure_deleted() + + def compare_states(self): + compare_properties = ("display_name", "access_key", "key_count") + for name in compare_properties: + rgw_value = getattr(self.rgw, name) + directory_value = getattr(self.directory, name) + if rgw_value != directory_value: + yield ( + f"- Differing {name}: " + f"RGW has {rgw_value!r}, directory has {directory_value!r}" + ) + + def validate(self) -> bool: + mismatches: list[str] = [] + + if self.should_exist: + if self.rgw.exists: + if "soft" not in self.directory.deletion_stages: + # In soft deletion state we don't care whether the attributes are + # correct. Specifically the key count won't match. + mismatches.extend(self.compare_states()) + else: + mismatches.append("- not found in local users") + else: + if self.rgw.exists: + mismatches.append( + "- is not known in the directory but exists (unmanaged) in RGW" + ) + + if mismatches: + log.error( + f"User data mismatch for {self.uid}:\n" + + "\n\t".join(mismatches) + ) + return False + else: + return True + + +class UserManager: + users: dict[str, User] + + def __init__(self, directory_connection, location: str, rg: str): + self.dir_conn = directory_connection + self.location = location + self.rg = rg + self.processing_errors = False + self.users = {} + + # Get the desired state from the directory + directory_info = self.dir_conn.list_s3_users(self.location, self.rg) + for uid, user_dict in directory_info.items(): + user = self.get_user(uid) + user.directory.update(user_dict) + + # Get user objects for all local users + for uid in list_radosgw_users(): + user = self.get_user(uid) + user.rgw.update() + + def get_user(self, uid) -> User: + return self.users.setdefault(uid, User(uid)) + + def report_local_users_to_directory(self): + self.dir_conn.update_s3_users( + { + user.uid: { + "display_name": user.rgw.display_name, + "access_key": user.rgw.access_key, + # directory ring0 API wants to have RG and location + # as explicit values + "location": self.location, + "storage_resource_group": self.rg, + "secret_key": None, + } + for user in self.users.values() + if user.rgw.exists + } + ) + + def sync_users(self): + for user in self.users.values(): + try: + user.ensure() + user.validate() + except Exception: + # individual errors shall not block progress on all other users + log.exception( + f"Encountered an error while handling {user.uid}, continuing:" + ) + self.processing_errors = True + + # report all users present with uid, display_name, access_key + self.report_local_users_to_directory() + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Flying Circus S3 user management and accounting" + ) + parser.add_argument( + "-E", + "--enc", + default="/etc/nixos/enc.json", + help="Path to enc.json (default: %(default)s)", + ) + + args = parser.parse_args() + with open(args.enc) as f: + enc = json.load(f) + + directory = connect(enc, ring="max") + location = enc["parameters"]["location"] + + # Accounting first, based on the currently existing users: ensure users + # that are deleted are accounted as accurately as possible. Users that + # are about to be created will be accounted in the next run - they can't + # possibly consume anything right now anyway. + got_errors = False + try: + accounting(location, directory) + except Exception: + log.exception("Error during accounting:") + got_errors = True + log.warning("Continuing user management despite accounting errors.") + + user_manager = UserManager( + directory, location, enc["parameters"]["resource_group"] + ) + user_manager.sync_users() + got_errors = got_errors or user_manager.processing_errors + + # on errors, the service shall return a non-zero exit code to be caught by + # our monitoring + return 2 if got_errors else 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/pkgs/fc/agent/fc/manage/tests/fakedirectory_s3users.py b/pkgs/fc/agent/fc/manage/tests/fakedirectory_s3users.py new file mode 100644 index 000000000..bb5698d97 --- /dev/null +++ b/pkgs/fc/agent/fc/manage/tests/fakedirectory_s3users.py @@ -0,0 +1,56 @@ +import logging +from pprint import pprint +from xmlrpc.server import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer + + +class RequestHandler(SimpleXMLRPCRequestHandler): + rpc_paths = () + + +class Directory(object): + """A fake directory implementation to allow code in the integration + tests to be properly exercised. + """ + + def store_s3(self, location, usage): + print("store_s3") + pprint(location) + pprint(usage) + + def list_s3_users(self, location, storage_resource_group_filter): + print("list_s3_users with", location, storage_resource_group_filter) + return { + "fc": { + "location": "rzob", + "storage_resource_group": "services", + "display_name": "FC user", + "access_key": "ubbsAFsG", + "secret_key": None, + "deletion": {"deadline": "", "stages": []}, + }, + "services:sometest": { + "location": "rzob", + "storage_resource_group": "services", + "display_name": "test modified", + "access_key": "dnDlid0jyRs1sK9vEOGV", + "secret_key": "VqBfxCqupucBSjo7ksDcf4K6vhgsIdGKnL0ielLi", + # "secret_key": None, + "deletion": {"deadline": "", "stages": ["soft"]}, + }, + } + + def update_s3_users(self, users_report): + print("update_s3_users") + pprint(users_report) + + +logging.basicConfig(level=logging.DEBUG) +# Create server +with SimpleXMLRPCServer( + ("0.0.0.0", 2342), requestHandler=RequestHandler, allow_none=True +) as server: + server.register_introspection_functions() + server.register_instance(Directory()) + + # Run the server's main loop + server.serve_forever() diff --git a/pkgs/fc/agent/fc/manage/tests/test_s3users.py b/pkgs/fc/agent/fc/manage/tests/test_s3users.py new file mode 100644 index 000000000..c1c958295 --- /dev/null +++ b/pkgs/fc/agent/fc/manage/tests/test_s3users.py @@ -0,0 +1,719 @@ +import json +import logging +from unittest.mock import Mock, call + +import pytest +from fc.manage.s3users import ( + DirectoryState, + RGWState, + User, + UserManager, + accounting, +) + +# TODO: directory ring api mock (xmlrpc server) + + +@pytest.fixture +def subprocess_run(monkeypatch): + mock_obj = Mock() + monkeypatch.setattr("subprocess.run", mock_obj) + return mock_obj + + +def test_object_instances_trivial(subprocess_run): + user = User("test") + assert user.uid == "test" + assert isinstance(user.rgw, RGWState) + assert isinstance(user.directory, DirectoryState) + + assert not user.should_exist + assert not user.should_be_deleted + + subprocess_run().stdout = json.dumps( + { + "display_name": "Test User", + "keys": [ + { + "access_key": "12345", + "secret_key": "abcde", + } + ], + } + ) + user.rgw.update() + assert user.rgw.display_name == "Test User" + assert user.rgw.key_count == 1 + assert user.rgw.access_key == "12345" + assert user.rgw.secret_key == "abcde" + + user.directory.update( + { + "display_name": "Test User", + "access_key": "12345", + "secret_key": "abcde", + "deletion": {"stages": ["foo"]}, + } + ) + assert user.directory.display_name == "Test User" + assert user.directory.access_key == "12345" + assert user.directory.secret_key == "abcde" + assert user.directory.deletion_stages == ["foo"] + assert user.directory.key_count == 1 + + assert list(user.compare_states()) == [] + assert user.validate() + + +def test_rgw_state_ignores_additional_keys( + subprocess_run, +): + user = User("test") + + subprocess_run().stdout = json.dumps( + { + "display_name": "Test User", + "keys": [ + { + "access_key": "12345", + "secret_key": "abcde", + }, + { + "access_key": "67890", + "secret_key": "fghij", + }, + ], + } + ) + user.rgw.update() + + assert user.rgw.key_count == 2 + assert user.rgw.access_key == "12345" + assert user.rgw.secret_key == "abcde" + + +def test_ensure_exists_creates_user(subprocess_run, caplog, capfd): + caplog.set_level(logging.INFO) + + user = User("services:sometest") + user.directory.update( + { + "display_name": "test test", + "access_key": "dnDlid0jyRs1sK9vEOGV", + "secret_key": "directory-provided-secret-key", + "deletion": {"stages": []}, + } + ) + + subprocess_run.side_effect = [ + Mock(stdout=""), + Mock(stdout=""), + Mock( + stdout=json.dumps( + { + "display_name": "test test", + "keys": [ + { + "access_key": "dnDlid0jyRs1sK9vEOGV", + "secret_key": "directory-provided-secret-key", + }, + ], + } + ) + ), + ] + + assert not user.rgw.exists + user.ensure() + assert user.rgw.exists + assert user.rgw.display_name == "test test" + assert user.rgw.access_key == "dnDlid0jyRs1sK9vEOGV" + assert user.rgw.secret_key == "directory-provided-secret-key" + assert user.rgw.key_count == 1 + + assert subprocess_run.call_args_list == [ + call( + [ + "radosgw-admin", "user", "create", + "--uid", "services:sometest", + "--display-name", "test test", + "--access-key", "dnDlid0jyRs1sK9vEOGV", + "--gen-secret" + ], + check=True, stdout=-1, stderr=-1, + ), + call( + [ + "radosgw-admin", "user", "modify", + "--uid", "services:sometest", + "--display-name", "test test", + "--access-key", "dnDlid0jyRs1sK9vEOGV", + "--secret-key", "directory-provided-secret-key", + ], + check=True, stdout=-1, stderr=-1, + ), + call( + [ + "radosgw-admin", "--format", "json", + "user", "info", + "--uid", "services:sometest", + ], + check=True, stdout=-1, stderr=-1, + ), + ] # fmt: skip + + captured = capfd.readouterr() + assert "directory-provided-secret-key" not in caplog.text + assert "directory-provided-secret-key" not in captured.out + assert "directory-provided-secret-key" not in captured.err + assert "" in captured.out + + +def test_ensure_exists_creates_user_no_secret_provided(subprocess_run, caplog): + import logging + + caplog.set_level(logging.INFO) + + user = User("services:sometest") + user.directory.update( + { + "display_name": "test test", + "access_key": "dnDlid0jyRs1sK9vEOGV", + "secret_key": None, + "deletion": {"stages": []}, + } + ) + + subprocess_run.side_effect = [ + Mock(stdout=""), + Mock(stdout=""), + Mock( + stdout=json.dumps( + { + "display_name": "test test", + "keys": [ + { + "access_key": "dnDlid0jyRs1sK9vEOGV", + "secret_key": "randomsecretkey", + }, + ], + } + ) + ), + ] + + assert not user.rgw.exists + user.ensure() + assert user.rgw.exists + assert user.rgw.display_name == "test test" + assert user.rgw.access_key == "dnDlid0jyRs1sK9vEOGV" + assert user.rgw.secret_key == "randomsecretkey" + assert user.rgw.key_count == 1 + + assert subprocess_run.call_args_list == [ + call( + [ + "radosgw-admin", "user", "create", + "--uid", "services:sometest", + "--display-name", "test test", + "--access-key", "dnDlid0jyRs1sK9vEOGV", + "--gen-secret" + ], + check=True, stdout=-1, stderr=-1, + ), + call( + [ + "radosgw-admin", "user", "modify", + "--uid", "services:sometest", + "--display-name", "test test", + ], + check=True, stdout=-1, stderr=-1, + ), + call( + [ + "radosgw-admin", "--format", "json", + "user", "info", + "--uid", "services:sometest", + ], + check=True, stdout=-1, stderr=-1, + ), + ] # fmt: skip + + assert "no secret key provided" in caplog.text + + +def test_ensure_updates_users(subprocess_run): + user = User("services:sometest") + user.directory.update( + { + "display_name": "a new display name", + "access_key": "the access key", + "secret_key": "a new secret key", + "deletion": {"stages": []}, + } + ) + + subprocess_run.side_effect = [ + Mock( + stdout=json.dumps( + { + "display_name": "old display name", + "keys": [ + { + "access_key": "the access key", + "secret_key": "old secret key", + }, + ], + } + ) + ), + Mock(stdout=""), + Mock( + stdout=json.dumps( + { + "display_name": "new display name", + "keys": [ + { + "access_key": "the access key", + "secret_key": "new secret key", + }, + ], + } + ) + ), + ] + + user.rgw.update() + assert user.rgw.exists + assert user.rgw.display_name == "old display name" + assert user.rgw.access_key == "the access key" + assert user.rgw.secret_key == "old secret key" + + user.ensure() + + assert user.rgw.exists + assert user.rgw.display_name == "new display name" + assert user.rgw.access_key == "the access key" + assert user.rgw.secret_key == "new secret key" + assert user.rgw.key_count == 1 + + assert subprocess_run.call_args_list == [ + call( + [ + "radosgw-admin", "--format", "json", + "user", "info", + "--uid", "services:sometest", + ], + check=True, stdout=-1, stderr=-1, + ), + call( + [ + "radosgw-admin", "user", "modify", + "--uid", "services:sometest", + "--display-name", "a new display name", + "--access-key", "the access key", + "--secret-key", "a new secret key", + ], + check=True, stdout=-1, stderr=-1, + ), + call( + [ + "radosgw-admin", "--format", "json", + "user", "info", + "--uid", "services:sometest", + ], + check=True, stdout=-1, stderr=-1, + ), + ] # fmt: skip + + +def test_validation(caplog): + user = User("test") + + user.directory.exists = True + user.rgw.exists = False + + assert not user.validate() + assert list(user.compare_states()) == [ + "- Differing key_count: RGW has 0, directory has 1" + ] + + assert ( + "User data mismatch for test:\n- not found in local users\n" + in caplog.text + ) + + user.directory.exists = False + user.rgw.exists = True + + assert not user.validate() + assert ( + "- is not known in the directory but exists (unmanaged) in RGW" + in caplog.text + ) + + user.directory.exists = True + user.rgw.exists = True + + user.directory.display_name = "directory" + user.directory.access_key = "directory" + user.directory.secret_key = "directory" + + user.rgw.display_name = "rgw" + user.rgw.access_key = "rgw" + user.rgw.secret_key = "rgw" + + assert list(user.compare_states()) == [ + "- Differing display_name: RGW has 'rgw', directory has 'directory'", + "- Differing access_key: RGW has 'rgw', directory has 'directory'", + "- Differing key_count: RGW has 0, directory has 1", + ] + assert not user.validate() + + user.directory.exists = False + user.rgw.exists = False + assert user.validate() + + +def test_users_pending_soft_deletion_still_added(subprocess_run, caplog): + user = User("services:sometest") + user.directory.update( + { + "display_name": "test test", + "access_key": "dnDlid0jyRs1sK9vEOGV", + "secret_key": None, + "deletion": {"stages": ["soft"]}, + } + ) + + subprocess_run.side_effect = [ + Mock(stdout=""), + Mock(stdout=""), + Mock( + stdout=json.dumps( + { + "display_name": "test test", + "keys": [ + { + "access_key": "dnDlid0jyRs1sK9vEOGV", + "secret_key": "random-key", + }, + ], + } + ) + ), + Mock( + stdout=json.dumps( + { + "display_name": "test test", + "keys": [ + { + "access_key": "some-old-key", + "secret_key": "...", + }, + { + "access_key": "some-other-old-key", + "secret_key": "...", + }, + ], + } + ) + ), + Mock(stdout=""), + Mock(stdout=""), + Mock( + stdout=json.dumps( + { + "display_name": "test test", + "keys": [], + } + ) + ), + ] + + assert not user.rgw.exists + user.ensure() + assert user.rgw.exists + assert user.rgw.display_name == "test test" + assert user.rgw.access_key == None + assert user.rgw.secret_key == None + assert user.rgw.key_count == 0 + + assert subprocess_run.call_args_list == [ + call( + [ + "radosgw-admin", "user", "create", + "--uid", "services:sometest", + "--display-name", "test test", + "--access-key", "dnDlid0jyRs1sK9vEOGV", + "--gen-secret" + ], + check=True, stdout=-1, stderr=-1, + ), + call( + [ + "radosgw-admin", "user", "modify", + "--uid", "services:sometest", + "--display-name", "test test", + ], + check=True, stdout=-1, stderr=-1, + ), + call( + [ + "radosgw-admin", "--format", "json", + "user", "info", + "--uid", "services:sometest", + ], + check=True, stdout=-1, stderr=-1, + ), + call( + [ + "radosgw-admin", "--format", "json", + "user", "info", + "--uid", "services:sometest", + ], + check=True, stdout=-1, stderr=-1, + ), + call( + [ + "radosgw-admin", "key", "rm", + "--access-key", "some-old-key", + ], + check=True, stdout=-1, stderr=-1, + ), + call( + [ + "radosgw-admin", "key", "rm", + "--access-key", "some-other-old-key", + ], + check=True, stdout=-1, stderr=-1, + ), + call( + [ + "radosgw-admin", "--format", "json", + "user", "info", + "--uid", "services:sometest", + ], + check=True, stdout=-1, stderr=-1, + ), + ] # fmt: skip + + assert user.validate() + + +def test_ensure_does_not_recreate_hard_deleted_users(subprocess_run): + user = User("services:sometest") + user.directory.update( + { + "display_name": "test test", + "access_key": "dnDlid0jyRs1sK9vEOGV", + "secret_key": None, + "deletion": {"stages": ["soft", "hard"]}, + } + ) + + subprocess_run.side_effect = [] + + assert not user.rgw.exists + user.ensure() + assert not user.rgw.exists + assert user.rgw.display_name is None + assert user.rgw.access_key is None + assert user.rgw.secret_key is None + assert user.rgw.key_count == 0 + + assert subprocess_run.call_args_list == [] + + assert user.validate() + assert list(user.compare_states()) == [ + "- Differing display_name: RGW has None, directory has 'test test'", + "- Differing access_key: RGW has None, directory has 'dnDlid0jyRs1sK9vEOGV'", + "- Differing key_count: RGW has 0, directory has 1", + ] + + +def test_ensure_hard_state_deletes_users(subprocess_run): + user = User("services:sometest") + user.directory.update( + { + "display_name": "test test", + "access_key": "dnDlid0jyRs1sK9vEOGV", + "secret_key": "asdf", + "deletion": {"stages": []}, + } + ) + + subprocess_run.side_effect = [ + Mock(stdout=""), + Mock(stdout=""), + Mock( + stdout=json.dumps( + { + "display_name": "test test", + "keys": [ + { + "access_key": "dnDlid0jyRs1sK9vEOGV", + "secret_key": "asdf", + }, + ], + } + ) + ), + ] + + # Create the user + user.ensure() + + assert user.rgw.exists + assert user.rgw.display_name == "test test" + assert user.rgw.key_count == 1 + assert user.rgw.access_key == "dnDlid0jyRs1sK9vEOGV" + assert user.rgw.secret_key == "asdf" + + subprocess_run.reset_mock() + + assert subprocess_run.call_args_list == [] + + user.directory.update( + { + "display_name": "test test", + "access_key": "dnDlid0jyRs1sK9vEOGV", + "secret_key": "asdf", + "deletion": {"stages": ["soft", "hard"]}, + } + ) + + subprocess_run.side_effect = [ + Mock(stdout=""), + Mock(stdout=""), + ] + + user.ensure() + + assert not user.rgw.exists + assert user.rgw.display_name is None + assert user.rgw.access_key is None + assert user.rgw.secret_key is None + assert user.rgw.key_count == 0 + + assert subprocess_run.call_args_list == [ + call( + [ + "radosgw-admin", "user", "rm", + "--uid", "services:sometest", + "--purge-data", "--purge-keys", + ], + check=True, stdout=-1, stderr=-1, + ), + call( + [ + "radosgw-admin", "--format", "json", + "user", "info", + "--uid", "services:sometest", + ], + check=True, stdout=-1, stderr=-1, + ), + ] # fmt: skip + + assert user.validate() + assert list(user.compare_states()) == [ + "- Differing display_name: RGW has None, directory has 'test test'", + "- Differing access_key: RGW has None, directory has 'dnDlid0jyRs1sK9vEOGV'", + "- Differing key_count: RGW has 0, directory has 1", + ] + + +def test_accounting(subprocess_run): + directory = Mock() + + subprocess_run.side_effect = [ + Mock(stdout=json.dumps(["services:user1"])), + Mock(stdout=json.dumps({"stats": {"total_bytes": 1000}})), + ] + + accounting("test-location", directory) + + assert directory.store_s3.call_args_list == [ + call("test-location", {"services:user1": "1000"}) + ] + + +def test_user_manager(subprocess_run, caplog): + caplog.set_level(logging.INFO) + + directory = Mock() + directory.list_s3_users.return_value = { + "services:sometest": { + "location": "rzob", + "storage_resource_group": "services", + "display_name": "test test", + "access_key": "dnDlid0jyRs1sK9vEOGV", + "secret_key": "VqBfxCqupucBSjo7ksDcf4K6vhgsIdGKnL0ielLi", + "deletion": {"deadline": "", "stages": []}, + } + } + + subprocess_run.side_effect = [ + Mock(stdout=json.dumps(["services:user1"])), + Mock( + stdout=json.dumps( + { + "display_name": "user 1", + "keys": [ + { + "access_key": "some-old-key", + "secret_key": "...", + }, + { + "access_key": "some-other-old-key", + "secret_key": "...", + }, + ], + } + ) + ), + Mock(stdout=""), # user create + Mock(stdout=""), # user update + Mock( + stdout=json.dumps( + { + "display_name": "test test", + "keys": [ + { + "access_key": "dnDlid0jyRs1sK9vEOGV", + "secret_key": "VqBfxCqupucBSjo7ksDcf4K6vhgsIdGKnL0ielLi", + }, + ], + } + ) + ), + ] + + manager = UserManager(directory, "test", "services") + manager.sync_users() + + assert directory.update_s3_users.call_args_list == [ + call( + { + "services:sometest": { + "display_name": "test test", + "access_key": "dnDlid0jyRs1sK9vEOGV", + "location": "test", + "storage_resource_group": "services", + "secret_key": None, + }, + "services:user1": { + "display_name": "user 1", + "access_key": "some-old-key", + "location": "test", + "storage_resource_group": "services", + "secret_key": None, + }, + } + ) + ] + + assert ( + " User data mismatch for services:user1:\n- is not known in the directory but exists (unmanaged) in RGW" + in caplog.text + ) diff --git a/pkgs/fc/agent/fc/util/runners.py b/pkgs/fc/agent/fc/util/runners.py index 0b103ab8a..953a4c011 100644 --- a/pkgs/fc/agent/fc/util/runners.py +++ b/pkgs/fc/agent/fc/util/runners.py @@ -1,5 +1,6 @@ # copied from pkgs/fc/ceph/src/fc/ceph/util.py import json +import shlex import subprocess from subprocess import PIPE @@ -67,10 +68,20 @@ def lsblk_linear(self, *args, **kw): result.append(candidate) return result + def radosgw_admin(self, *args, **kw): + return self.__run__("radosgw-admin", "--format", "json", *args, **kw) + def rbd(self, *args, **kw): return self.__run__("rbd", "--format", "json", *args, **kw) +class RedactedValue(str): + def __new__(cls, content): + obj = super().__new__(cls, "") + obj.orig = content + return obj + + class Runner(object): def __init__( self, @@ -82,31 +93,71 @@ def __init__( self.json = JSONRunner(self) + def redacted(self, value): + return RedactedValue(value) + def __getattr__(self, name): name = self.__aliases.get(name, name) def callable(*args, **kw): + silent_errors = kw.pop("silent_errors", lambda x: False) options = self.default_options.copy() options.update(kw) - print("$", name, " ".join(args), flush=True) + print("$", name, shlex.join(args), flush=True) check = options["check"] + + # Always cause the actual subprocess call to have `check` set + # but if the options passed into this call don't set check, then + # we only resort to logging the error and not re-raising it. options["check"] = True + call_args = [ + (arg.orig if isinstance(arg, RedactedValue) else arg) + for arg in args + ] + try: - return subprocess.run((name,) + args, **options).stdout + return subprocess.run( + [ + name, + ] + + call_args, + **options, + ).stdout except subprocess.CalledProcessError as e: - print("> return code:", e.returncode) - print("> stdout:") - print(e.stdout.decode("ascii", errors="replace")) - print("> stderr:") - print(e.stderr.decode("ascii", errors="replace")) + if not silent_errors(e.returncode, e.stdout, e.stderr): + known_secret_values = [ + x.orig for x in args if isinstance(x, RedactedValue) + ] + + print("> return code:", e.returncode) + print("> stdout:") + print( + self._redact_secrets( + known_secret_values, + e.stdout.decode("ascii", errors="replace"), + ) + ) + print("> stderr:") + print( + self._redact_secrets( + known_secret_values, + e.stderr.decode("ascii", errors="replace"), + ) + ) if check: raise return callable + @staticmethod + def _redact_secrets(secrets: list[str], text: str) -> str: + for secret in secrets: + text = text.replace(secret, "") + return text + run = Runner( aliases={ @@ -115,5 +166,6 @@ def callable(*args, **kw): "ceph_mon": "ceph-mon", "ceph_authtool": "ceph-authtool", "mkfs_xfs": "mkfs.xfs", + "radosgw_admin": "radosgw-admin", } ) diff --git a/pkgs/fc/agent/pytest.ini b/pkgs/fc/agent/pytest.ini new file mode 100644 index 000000000..963910ddd --- /dev/null +++ b/pkgs/fc/agent/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +addopts = -vv --cov=fc --cov-report=html diff --git a/pkgs/fc/agent/setup.py b/pkgs/fc/agent/setup.py index 13aa810d5..6b261fe37 100644 --- a/pkgs/fc/agent/setup.py +++ b/pkgs/fc/agent/setup.py @@ -76,7 +76,7 @@ "fc-qemu-scrub=fc.manage.qemu:main", "fc-monitor=fc.manage.monitor:main", "fc-resize-disk=fc.manage.resize_disk:app", - "fc-s3accounting=fc.manage.s3accounting:main", + "fc-s3users=fc.manage.s3users:main", "fc-zones=fc.manage.zones:update", "fctl=fc.util.fctl:app", ], diff --git a/pkgs/fc/ceph/src/fc/ceph/util/__init__.py b/pkgs/fc/ceph/src/fc/ceph/util/__init__.py index ac9b087d8..b7959fd59 100644 --- a/pkgs/fc/ceph/src/fc/ceph/util/__init__.py +++ b/pkgs/fc/ceph/src/fc/ceph/util/__init__.py @@ -2,6 +2,7 @@ import ctypes import json import os +import shlex import subprocess import time from subprocess import PIPE @@ -99,7 +100,7 @@ def callable(*args: str, **kw): options.update(kw) console.print( - "$", name, " ".join([str(a) for a in args]), style="grey50" + "$", name, shlex.join([str(a) for a in args]), style="grey50" ) check = options["check"]