Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

COST-716: Finalizing GCP sources and supporting app-specific settings for AWS and Azure #2609

Merged
merged 24 commits into from
Jan 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ services:
- SOURCES_KAFKA_HOST=${SOURCES_KAFKA_HOST-kafka}
- SOURCES_KAFKA_PORT=${SOURCES_KAFKA_PORT-29092}
- KOKU_SOURCES_CLIENT_PORT=${KOKU_SOURCES_CLIENT_PORT-9000}
- GOOGLE_APPLICATION_CREDENTIALS=${GOOGLE_APPLICATION_CREDENTIALS}
- prometheus_multiproc_dir=/tmp
privileged: true
ports:
Expand Down
9 changes: 6 additions & 3 deletions koku/providers/gcp/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from googleapiclient.errors import HttpError
from rest_framework import serializers

from ..provider_errors import ProviderErrors
from ..provider_errors import SkipStatusPush
from ..provider_interface import ProviderInterface
from api.common import error_obj
Expand Down Expand Up @@ -70,9 +71,11 @@ def cost_usage_source_is_reachable(self, credentials, data_source):

for required_permission in REQUIRED_IAM_PERMISSIONS:
if required_permission not in permissions:
key = "authentication.project_id"
err_msg = f"Improper IAM permissions: {permissions}."
raise serializers.ValidationError(error_obj(key, err_msg))
key = ProviderErrors.GCP_INCORRECT_IAM_PERMISSIONS
internal_message = f"Improper IAM permissions: {permissions}."
LOG.warning(internal_message)
message = f"Incorrect IAM permissions for project {project}"
raise serializers.ValidationError(error_obj(key, message))

except GoogleCloudError as e:
key = "authentication.project_id"
Expand Down
2 changes: 2 additions & 0 deletions koku/providers/provider_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class ProviderErrors:
AZURE_CREDENTAL_UNREACHABLE = "authentication.credentials.unreachable"
AZURE_CLIENT_ERROR = "azure.exception"

GCP_INCORRECT_IAM_PERMISSIONS = "gcp.iam.permissions"

# MESSAGES
INVALID_SOURCE_TYPE_MESSAGE = "The given source type is not supported."

Expand Down
56 changes: 5 additions & 51 deletions koku/sources/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
"""Sources Model Serializers."""
import copy
import logging
from socket import gaierror
from uuid import uuid4
Expand All @@ -35,9 +34,12 @@
from providers.provider_errors import SkipStatusPush
from sources.api import get_account_from_header
from sources.api import get_auth_header
from sources.storage import _update_authentication
from sources.storage import _update_billing_source
from sources.storage import get_source_instance
from sources.storage import SourcesStorageError


LOG = logging.getLogger(__name__)

ALLOWED_BILLING_SOURCE_PROVIDERS = (
Expand Down Expand Up @@ -90,54 +92,6 @@ def get_source_uuid(self, obj):
"""Get the source_uuid."""
return obj.source_uuid

def _validate_billing_source(self, provider_type, billing_source): # noqa: C901
"""Validate billing source parameters."""
if provider_type == Provider.PROVIDER_AWS:
# TODO: Remove `and not billing_source.get("bucket")` if UI is updated to send "data_source" field
if not billing_source.get("data_source", {}).get("bucket") and not billing_source.get("bucket"):
raise SourcesStorageError("Missing AWS bucket.")
elif provider_type == Provider.PROVIDER_AZURE:
data_source = billing_source.get("data_source")
if not data_source:
raise SourcesStorageError("Missing AZURE data_source.")
if not data_source.get("resource_group"):
raise SourcesStorageError("Missing AZURE resource_group")
if not data_source.get("storage_account"):
raise SourcesStorageError("Missing AZURE storage_account")
elif provider_type == Provider.PROVIDER_GCP:
data_source = billing_source.get("data_source")
if not data_source:
raise SourcesStorageError("Missing GCP data_source.")
if not data_source.get("dataset"):
raise SourcesStorageError("Missing GCP dataset")

def _update_billing_source(self, instance, billing_source):
if instance.source_type not in ALLOWED_BILLING_SOURCE_PROVIDERS:
raise SourcesStorageError(f"Option not supported by source type {instance.source_type}.")
if instance.billing_source.get("data_source"):
billing_copy = copy.deepcopy(instance.billing_source.get("data_source"))
data_source = billing_source.get("data_source", {})
if data_source.get("resource_group") or data_source.get("storage_account"):
billing_copy.update(billing_source.get("data_source"))
billing_source["data_source"] = billing_copy
self._validate_billing_source(instance.source_type, billing_source)
# This if statement can also be removed if UI is updated to send "data_source" field
if instance.source_type in (Provider.PROVIDER_AWS, Provider.PROVIDER_AWS_LOCAL) and not billing_source.get(
"data_source"
):
billing_source = {"data_source": billing_source}
return billing_source

def _update_authentication(self, instance, authentication):
if instance.source_type not in ALLOWED_AUTHENTICATION_PROVIDERS:
raise SourcesStorageError(f"Option not supported by source type {instance.source_type}.")
auth_dict = instance.authentication
if not auth_dict.get("credentials"):
auth_dict["credentials"] = {"subscription_id": None}
subscription_id = authentication.get("credentials", {}).get("subscription_id")
auth_dict["credentials"]["subscription_id"] = subscription_id
return auth_dict

def update(self, instance, validated_data):
"""Update a Provider instance from validated data."""
billing_source = validated_data.get("billing_source")
Expand All @@ -146,10 +100,10 @@ def update(self, instance, validated_data):
try:
with ServerProxy(SOURCES_CLIENT_BASE_URL) as sources_client:
if billing_source:
billing_source = self._update_billing_source(instance, billing_source)
billing_source = _update_billing_source(instance, billing_source)
sources_client.update_billing_source(instance.source_id, billing_source)
if authentication:
authentication = self._update_authentication(instance, authentication)
authentication = _update_authentication(instance, authentication)
sources_client.update_authentication(instance.source_id, authentication)
except Fault as error:
LOG.error(f"Sources update error: {error}")
Expand Down
13 changes: 11 additions & 2 deletions koku/sources/kafka_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
PROCESS_QUEUE = queue.PriorityQueue()
COUNT = itertools.count() # next(COUNT) returns next sequential number
KAFKA_APPLICATION_CREATE = "Application.create"
KAFKA_APPLICATION_UPDATE = "Application.update"
KAFKA_APPLICATION_DESTROY = "Application.destroy"
KAFKA_AUTHENTICATION_CREATE = "Authentication.create"
KAFKA_AUTHENTICATION_UPDATE = "Authentication.update"
Expand Down Expand Up @@ -106,6 +107,7 @@ def __init__(self, auth_header, source_id):
self.source_uuid = details.get("uid")
self.source_type_name = sources_network.get_source_type_name(self.source_type_id)
self.source_type = SOURCE_PROVIDER_MAP.get(self.source_type_name)
self.app_settings = sources_network.get_application_settings(self.source_type)


def _extract_from_header(headers, header_type):
Expand Down Expand Up @@ -214,7 +216,7 @@ def get_sources_msg_data(msg, app_type_id):
LOG.debug(f"msg value: {str(value)}")
event_type = _extract_from_header(msg.headers(), KAFKA_HDR_EVENT_TYPE)
LOG.debug(f"event_type: {str(event_type)}")
if event_type in (KAFKA_APPLICATION_CREATE, KAFKA_APPLICATION_DESTROY):
if event_type in (KAFKA_APPLICATION_CREATE, KAFKA_APPLICATION_UPDATE, KAFKA_APPLICATION_DESTROY):
if int(value.get("application_type_id")) == app_type_id:
LOG.debug("Application Message: %s", str(msg))
msg_data["event_type"] = event_type
Expand Down Expand Up @@ -352,6 +354,13 @@ def sources_network_info(source_id, auth_header):

storage.add_provider_sources_network_info(src_details, source_id)
save_auth_info(auth_header, source_id)
app_settings = src_details.app_settings
if app_settings:
try:
storage.update_application_settings(source_id, app_settings)
except storage.SourcesStorageError as error:
LOG.error(f"Unable to apply application settings. error: {str(error)}")
return


def cost_mgmt_msg_filter(msg_data):
Expand Down Expand Up @@ -421,7 +430,7 @@ def process_message(app_type_id, msg): # noqa: C901

save_auth_info(msg_data.get("auth_header"), msg_data.get("source_id"))

elif msg_data.get("event_type") in (KAFKA_SOURCE_UPDATE,):
elif msg_data.get("event_type") in (KAFKA_SOURCE_UPDATE, KAFKA_APPLICATION_UPDATE):
if storage.is_known_source(msg_data.get("source_id")) is False:
LOG.info("Update event for unknown source id, skipping...")
return
Expand Down
106 changes: 85 additions & 21 deletions koku/sources/sources_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import requests
from requests.exceptions import RequestException

from api.provider.models import Provider
from sources import storage
from sources.config import Config
from sources.sources_error_message import SourcesErrorMessage

Expand Down Expand Up @@ -144,6 +146,77 @@ def get_source_type_name(self, type_id):
source_name = endpoint_response.get("data")[0].get("name")
return source_name

def _build_app_settings_for_gcp(self, app_settings):
"""Build settings structure for gcp."""
billing_source = {}
dataset = app_settings.get("dataset")
if dataset:
billing_source = {"data_source": {}}
billing_source["data_source"]["dataset"] = dataset
return billing_source

def _build_app_settings_for_aws(self, app_settings):
"""Build settings structure for aws."""
billing_source = {}
bucket = app_settings.get("bucket")
if bucket:
billing_source = {"data_source": {}}
billing_source["data_source"]["bucket"] = bucket
return billing_source

def _build_app_settings_for_azure(self, app_settings):
"""Build settings structure for azure."""
billing_source = {}
authentication = {}
resource_group = app_settings.get("resource_group")
storage_account = app_settings.get("storage_account")
subscription_id = app_settings.get("subscription_id")

if resource_group or storage_account:
billing_source = {"data_source": {}}
if resource_group:
billing_source["data_source"]["resource_group"] = resource_group
if storage_account:
billing_source["data_source"]["storage_account"] = storage_account
if subscription_id:
authentication = {"credentials": {}}
authentication["credentials"]["subscription_id"] = subscription_id
return billing_source, authentication

def _update_app_settings_for_source_type(self, source_type, app_settings):
"""Update application settings."""
settings = {}
billing_source = {}
authentication = {}

if source_type in (Provider.PROVIDER_GCP, Provider.PROVIDER_GCP_LOCAL):
billing_source = self._build_app_settings_for_gcp(app_settings)
elif source_type in (Provider.PROVIDER_AWS, Provider.PROVIDER_AWS_LOCAL):
billing_source = self._build_app_settings_for_aws(app_settings)
elif source_type in (Provider.PROVIDER_AZURE, Provider.PROVIDER_AZURE_LOCAL):
billing_source, authentication = self._build_app_settings_for_azure(app_settings)

if billing_source:
settings["billing_source"] = billing_source
if authentication:
settings["authentication"] = authentication

return settings

def get_application_settings(self, source_type):
"""Get the application settings from Sources."""
application_url = "{}/applications?filter[source_id]={}".format(self._base_url, str(self._source_id))
r = self._get_network_response(application_url, self._identity_header, "Unable to application settings")
applications_response = r.json()
if not applications_response.get("data"):
raise SourcesHTTPClientError(f"No application data for source: {self._source_id}")
app_settings = applications_response.get("data")[0].get("extra")

updated_settings = None
if app_settings:
updated_settings = self._update_app_settings_for_source_type(source_type, app_settings)
return updated_settings

def get_aws_credentials(self):
"""Get the roleARN from Sources Authentication service."""
url = "{}/applications?filter[source_id]={}".format(self._base_url, str(self._source_id))
Expand Down Expand Up @@ -186,24 +259,15 @@ def get_gcp_credentials(self):
else:
raise SourcesHTTPClientError(f"Unable to get GCP credentials for Source: {self._source_id}")

authentications_str = "{}/authentications?[authtype]=project_id&[resource_id]={}"
authentications_str = "{}/authentications?[authtype]=project_id_service_account_json&[resource_id]={}"
authentications_url = authentications_str.format(self._base_url, str(resource_id))
r = self._get_network_response(authentications_url, self._identity_header, "Unable to GCP credentials")
authentications_response = r.json()
if not authentications_response.get("data"):
raise SourcesHTTPClientError(f"Unable to get GCP credentials for Source: {self._source_id}")
authentications_id = authentications_response.get("data")[0].get("id")

authentications_internal_url = "{}/authentications/{}?expose_encrypted_attribute[]=password".format(
self._internal_url, str(authentications_id)
)
r = self._get_network_response(
authentications_internal_url, self._identity_header, "Unable to GCP Credentials"
)
authentications_internal_response = r.json()
password = authentications_internal_response.get("password")
if password:
return {"project_id": password}
project_id = authentications_response.get("data")[0].get("username")
if project_id:
return {"project_id": project_id}

raise SourcesHTTPClientError(f"Unable to get GCP credentials for Source: {self._source_id}")

Expand Down Expand Up @@ -318,12 +382,12 @@ def set_source_status(self, error_msg, cost_management_type_id=None):
application_url = f"{self._base_url}/applications/{str(application_id)}"

json_data = self.build_source_status(error_msg)

application_response = requests.patch(application_url, json=json_data, headers=status_header)
if application_response.status_code != 204:
raise SourcesHTTPClientError(
f"Unable to set status for Source {self._source_id}. Reason: "
f"Status code: {application_response.status_code}. Response: {application_response.text}."
)
return True
if storage.save_status(self._source_id, json_data):
application_response = requests.patch(application_url, json=json_data, headers=status_header)
if application_response.status_code != 204:
raise SourcesHTTPClientError(
f"Unable to set status for Source {self._source_id}. Reason: "
f"Status code: {application_response.status_code}. Response: {application_response.text}."
)
return True
return False
Loading