Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add GetEnvironments method to DBus register #3462

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/rhsm/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2059,14 +2059,17 @@ def getServiceLevelList(self, owner_key: str) -> List[str]:
results = self.conn.request_get(method, description=_("Fetching service levels"))
return results

def getEnvironmentList(self, owner_key: str) -> List[dict]:
def getEnvironmentList(self, owner_key: str, list_all: bool = False) -> List[dict]:
"""
List the environments for a particular owner.

Some servers may not support this and will error out. The caller
can always check with supports_resource("environments").
"""
method = "/owners/%s/environments" % self.sanitize(owner_key)
if list_all and self.has_capability("typed_environments"):
method = "/owners/%s/environments?list_all=%r" % (self.sanitize(owner_key), list_all)
else:
method = "/owners/%s/environments" % (self.sanitize(owner_key))
results = self.conn.request_get(method, description=_("Fetching environments"))
return results

Expand Down
62 changes: 62 additions & 0 deletions src/rhsmlib/dbus/objects/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from rhsmlib.services.register import RegisterService
from rhsmlib.services.unregister import UnregisterService
from rhsmlib.services.entitlement import EntitlementService
from rhsmlib.services.environment import EnvironmentService
from rhsmlib.client_info import DBusSender
from subscription_manager.cp_provider import CPProvider

Expand All @@ -36,6 +37,14 @@

log = logging.getLogger(__name__)

ENVIRONMENTS_KEYS_TO_FILTER: List[str] = [
"contentPrefix",
"created",
"environmentContent",
"owner",
"updated",
]


class RegisterDBusImplementation(base_object.BaseImplementation):
def __init__(self):
Expand Down Expand Up @@ -181,6 +190,25 @@ def get_organizations(self, options: dict) -> List[dict]:
owners: List[dict] = uep.getOwnerList(options["username"])
return owners

def get_environments(self, options: dict) -> List[dict]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should not be in rhsmlib.dbus.objects.register module. This module should contain only methods related to D-Bus. This proposed method should be in new rhsmlib.services.environment module, because you can see that subscription-manager CLI already has environments sub-command. Thus, there will be potential to share new code.

This method should have optional argument uep (basic auth connection to candlepin) to be able to pass existing connection instance to this method.

I this method had another argument typed_environments: bool = True, then we could list not only typed environments. If typed_environments is False, then we could skip detection for that capability on server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of the card is that the GetEnvironments() method is intended to be a DBus method. With that said, I moved the actual list call to candlepin to a newly created EnvironmentService class. I then instantiated that in the DBus method and used it to call to candlepin. Is that okay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm misunderstanding your request, I'm happy to make more changes :)

"""Get environments for every org belonging to user

:param options: Connection options including the 'username', 'password', and 'org_id' keys
:return: List of environments
"""
uep: UEPConnection = self.build_uep(options, basic_auth_method=True)
environment_service: EnvironmentService = EnvironmentService(uep)

environments = environment_service.list(options["org_id"])

for environment in environments:
for key in ENVIRONMENTS_KEYS_TO_FILTER:
environment.pop(key, None)
if ("type" not in environment) or (environment["type"] is None):
environment["type"] = ""

return environments

def register_with_credentials(
self, organization: Optional[str], register_options: dict, connection_options: dict
) -> dict:
Expand Down Expand Up @@ -333,6 +361,40 @@ def __init__(self, conn=None, object_path=None, bus_name=None, sender=None, cmd_
self.sender = sender
self.cmd_line = cmd_line

@dbus.service.method(
dbus_interface=constants.PRIVATE_REGISTER_INTERFACE,
in_signature="sssa{sv}s",
out_signature="s",
)
@util.dbus_handle_exceptions
def GetEnvironments(self, username, password, org_id, connection_options, locale):
"""
This method tries to return list of environments in the given orgs. This method also uses
basic authentication method (using username and password).

:param username: string with username used for connection to candlepin server
:param password: string with password
:param org_id: string with org id to list environments for
:param connection_options: dictionary with connection options
:param locale: string with locale
:return: string with json returned by candlepin server
"""
connection_options = dbus_utils.dbus_to_python(connection_options, expected_type=dict)
connection_options["username"] = dbus_utils.dbus_to_python(username, expected_type=str)
connection_options["password"] = dbus_utils.dbus_to_python(password, expected_type=str)
connection_options["org_id"] = dbus_utils.dbus_to_python(org_id, expected_type=str)
locale = dbus_utils.dbus_to_python(locale, expected_type=str)

with DBusSender() as dbus_sender:
dbus_sender.set_cmd_line(sender=self.sender, cmd_line=self.cmd_line)
Locale.set(locale)

environments = self.impl.get_environments(connection_options)

dbus_sender.reset_cmd_line()

return json.dumps(environments)

@dbus.service.method(
dbus_interface=constants.PRIVATE_REGISTER_INTERFACE,
in_signature="ssa{sv}s",
Expand Down
43 changes: 43 additions & 0 deletions src/rhsmlib/services/environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright (c) 2024 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
import logging
from typing import List


from rhsm.connection import UEPConnection

log = logging.getLogger(__name__)


class EnvironmentService:
"""
Class for using environments
"""

def __init__(self, cp: UEPConnection) -> None:
"""
Initialization of EnvironmentService instance
:param cp: connection to Candlepin
"""
self.cp = cp

def list(self, org_id: str) -> List[dict]:
"""
Method for listing environments
:param org_id: organization to list environments for
:return: List of environments.
"""
list_all = self.cp.has_capability("typed_environments")
environments = self.cp.getEnvironmentList(org_id, list_all=list_all)

return environments
86 changes: 86 additions & 0 deletions test/rhsmlib/dbus/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,77 @@
]
"""

ENVIRONMENTS_CONTENT_JSON = """[
{
"created": "2024-11-07T20:01:47+0000",
"updated": "2024-11-07T20:01:47+0000",
"id": "fake-id",
"name": "test-environment",
"description": "test description",
"contentPrefix": null,
"type": "content-template",
"environmentContent": []
},
{
"created": "2024-11-07T20:01:47+0000",
"updated": "2024-11-07T20:01:47+0000",
"id": "fake-id-2",
"name": "test-environment-2",
"description": "test description",
"contentPrefix": null,
"type": "content-template",
"environmentContent": []
},
{
"created": "2024-11-07T20:01:47+0000",
"updated": "2024-11-07T20:01:47+0000",
"id": "fake-id-3",
"name": "test-environment-3",
"description": "test description",
"contentPrefix": null,
"type": null,
"environmentContent": []
},
{
"created": "2024-11-07T20:01:47+0000",
"updated": "2024-11-07T20:01:47+0000",
"id": "fake-id-4",
"name": "test-environment-4",
"description": "test description",
"contentPrefix": null,
"environmentContent": []
}
]
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more environments to this list. You will see that new unit test will start to fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added more environments to this list (10 more), but I don't see an issue. If you're seeing something else, what else can I do to reproduce?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the old code was used, then the unit test would fail.


ENVIRONMENTS_DBUS_JSON = """[
{
"id": "fake-id",
"name": "test-environment",
"description": "test description",
"type": "content-template"
},
{
"id": "fake-id-2",
"name": "test-environment-2",
"description": "test description",
"type": "content-template"
},
{
"id": "fake-id-3",
"name": "test-environment-3",
"description": "test description",
"type": ""
},
{
"id": "fake-id-4",
"name": "test-environment-4",
"description": "test description",
"type": ""
}
]
"""


class RegisterDBusObjectTest(SubManDBusFixture):
socket_dir: Optional[tempfile.TemporaryDirectory] = None
Expand Down Expand Up @@ -308,6 +379,21 @@ def test_GetOrgs(self):
result = self.impl.get_organizations({"username": "username", "password": "password"})
self.assertEqual(expected, result)

def test_GetEnvironments(self):
self.patches["is_registered"].return_value = False
mock_cp = mock.Mock(spec=connection.UEPConnection, name="UEPConnection")
mock_cp.username = "username"
mock_cp.password = "password"
mock_cp.getEnvironmentList = mock.Mock()
mock_cp.getEnvironmentList.return_value = json.loads(ENVIRONMENTS_CONTENT_JSON)
self.patches["build_uep"].return_value = mock_cp

expected = json.loads(ENVIRONMENTS_DBUS_JSON)
result = self.impl.get_environments(
{"username": "username", "password": "password", "org_id": "org_id"}
)
self.assertEqual(expected, result)

def test_RegisterWithActivationKeys(self):
expected = json.loads(CONSUMER_CONTENT_JSON_SCA)
self.patches["is_registered"].return_value = False
Expand Down
77 changes: 77 additions & 0 deletions test/rhsmlib/services/test_environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright (c) 2024 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
from unittest import mock

from test.rhsmlib.base import InjectionMockingTest

from rhsm import connection

from rhsmlib.services import environment


ENVIRONMENTS_JSON = [
{
"created": "2024-10-03T18:12:56+0000",
"updated": "2024-10-03T18:12:56+0000",
"id": "8bdf14cf9e534119a1fe617c03304768",
"name": "template 1",
"type": "content-template",
"description": "my template",
"owner": {
"id": "8ad980939253781c01925378340e0002",
"key": "content-sources-test",
"displayName": "ContentSourcesTest",
"href": "/owners/content-sources-test",
"contentAccessMode": "org_environment",
},
"environmentContent": [
{"contentId": "11055", "enabled": True},
{"contentId": "56a3a98c76ea4e16bd68424a2c9cc1c1", "enabled": True},
{"contentId": "11049", "enabled": True},
],
},
{
"created": "2024-10-09T19:08:14+0000",
"updated": "2024-10-09T19:08:14+0000",
"id": "6c62889601be41128fe2fece53141fd4",
"name": "template 2",
"type": "content-template",
"description": "my template",
"owner": {
"id": "8ad980939253781c01925378340e0002",
"key": "content-sources-test",
"displayName": "ContentSourcesTest",
"href": "/owners/content-sources-test",
"contentAccessMode": "org_environment",
},
"environmentContent": [
{"contentId": "11055", "enabled": True},
{"contentId": "11049", "enabled": True},
],
},
]


class TestEnvironmentService(InjectionMockingTest):
def setUp(self):
super(TestEnvironmentService, self).setUp()
self.mock_cp = mock.Mock(spec=connection.UEPConnection, name="UEPConnection")

def injection_definitions(self, *args, **kwargs):
return None

def test_list_environments(self):
self.mock_cp.getEnvironmentList.return_value = ENVIRONMENTS_JSON

result = environment.EnvironmentService(self.mock_cp).list("org")
self.assertEqual(ENVIRONMENTS_JSON, result)