Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds RBAC filtering #780

Merged
merged 7 commits into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions aquarius/app/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@
import json
import logging
import os
import requests

from aquarius.app.es_instance import ElasticsearchInstance
from aquarius.app.util import sanitize_record, get_signature_vrs, get_allowed_publishers
from aquarius.app.util import (
sanitize_record,
sanitize_query_result,
get_signature_vrs,
get_allowed_publishers,
)
from aquarius.ddo_checker.shacl_checker import validate_dict
from aquarius.events.processors import (
MetadataCreatedProcessor,
Expand All @@ -20,6 +24,7 @@
from aquarius.log import setup_logging
from aquarius.myapp import app
from aquarius.events.purgatory import Purgatory
from aquarius.rbac import RBAC
from artifacts import ERC721Template
from web3.logs import DISCARD

Expand Down Expand Up @@ -283,7 +288,7 @@ def query_ddo():
)

try:
return es_instance.es.search(data)
return sanitize_query_result(es_instance.es.search(data))
except elasticsearch.exceptions.TransportError as e:
error = e.error if isinstance(e.error, str) else str(e.error)
info = e.info if isinstance(e.info, dict) else ""
Expand Down Expand Up @@ -343,14 +348,7 @@ def validate_remote():
version = data.get("version", None)

if os.getenv("RBAC_SERVER_URL"):
payload = {
"eventType": "validateDDO",
"component": "metadatacache",
"ddo": data,
"browserHeaders": {k: v for k, v in request.headers.items()},
}

valid = requests.post(os.getenv("RBAC_SERVER_URL"), json=payload).json()
valid = RBAC.validate_ddo_rbac(data)

if not valid:
return (
Expand Down
13 changes: 12 additions & 1 deletion aquarius/app/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from web3.main import Web3

from aquarius.app.auth_util import sanitize_addresses
from aquarius.rbac import RBAC

logger = logging.getLogger("aquarius")
keys = KeyAPI(NativeECCBackend)
Expand All @@ -23,7 +24,17 @@ def sanitize_record(data_record):
if "_id" in data_record:
data_record.pop("_id")

return json.dumps(data_record, default=datetime_converter)
if not os.getenv("RBAC_SERVER_URL"):
return json.dumps(data_record, default=datetime_converter)

return RBAC.sanitize_record(data_record)


def sanitize_query_result(query_result):
if not os.getenv("RBAC_SERVER_URL"):
return query_result

return RBAC.sanitize_query_result(query_result)


def get_bool_env_value(envvar_name, default_value=0):
Expand Down
13 changes: 2 additions & 11 deletions aquarius/events/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from abc import ABC
from datetime import datetime

import requests
from jsonsempai import magic # noqa: F401

from aquarius.ddo_checker.shacl_checker import validate_dict
Expand All @@ -21,6 +20,7 @@
from aquarius.events.decryptor import decrypt_ddo
from aquarius.events.util import make_did, get_dt_factory
from aquarius.graphql import get_number_orders
from aquarius.rbac import RBAC
from artifacts import ERC20Template, ERC721Template
from web3.logs import DISCARD

Expand Down Expand Up @@ -61,17 +61,8 @@ def check_permission(self, publisher_address):
if self.__class__.__name__ == "MetadataCreatedProcessor"
else "update"
)
address = publisher_address
payload = {
"eventType": event_type,
"component": "metadatacache",
"credentials": {"type": "address", "value": address},
}

try:
return requests.post(os.getenv("RBAC_SERVER_URL"), json=payload).json()
except Exception:
return False
return RBAC.check_permission_rbac(event_type, publisher_address)

def add_aqua_data(self, record):
"""Adds keys that are specific to Aquarius, on top of the DDO structure:
Expand Down
82 changes: 82 additions & 0 deletions aquarius/rbac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#
# Copyright 2021 Ocean Protocol Foundation
# SPDX-License-Identifier: Apache-2.0
#
import logging
import requests
import os

logger = logging.getLogger("aquarius")


class RBAC:
@staticmethod
def set_headers(request):
RBAC.headers = {k: v for k, v in request.headers.items()}

@staticmethod
def sanitize_record(data_record):
payload = {
"eventType": "filter_single_result",
"component": "metadatacache",
"ddo": data_record,
"browserHeaders": getattr(RBAC, "headers", {}),
}

response = requests.post(os.getenv("RBAC_SERVER_URL"), json=payload)
if response.status_code != 200:
logger.warning(
f"Expected response code 200 from RBAC server, got {response.status_code}."
)

return (
response.json()
if response.status_code == 200 and response.json() is not False
else data_record
)

@staticmethod
def sanitize_query_result(query_result):
payload = {
"eventType": "filter_query_result",
"component": "metadatacache",
"query_result": query_result,
"browserHeaders": getattr(RBAC, "headers", {}),
}

response = requests.post(os.getenv("RBAC_SERVER_URL"), json=payload)

if response.status_code != 200:
logger.warning(
f"Expected response code 200 from RBAC server, got {response.status_code}."
)

return (
response.json()
if response.status_code == 200 and response.json() is not False
else query_result
)

@staticmethod
def validate_ddo_rbac(data):
payload = {
"eventType": "validateDDO",
"component": "metadatacache",
"ddo": data,
"browserHeaders": getattr(RBAC, "headers", {}),
}

return requests.post(os.getenv("RBAC_SERVER_URL"), json=payload).json()

@staticmethod
def check_permission_rbac(event_type, address):
payload = {
"eventType": event_type,
"component": "metadatacache",
"credentials": {"type": "address", "value": address},
}

try:
return requests.post(os.getenv("RBAC_SERVER_URL"), json=payload).json()
except Exception:
return False
10 changes: 9 additions & 1 deletion aquarius/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
import configparser

from elasticsearch import Elasticsearch
from flask import jsonify
from flask import jsonify, request
from flask_swagger import swagger
from flask_swagger_ui import get_swaggerui_blueprint
import os

from aquarius.app.assets import assets
from aquarius.app.chains import chains
Expand All @@ -20,11 +21,18 @@
from aquarius.events.events_monitor import EventsMonitor
from aquarius.events.util import setup_web3
from aquarius.myapp import app
from aquarius.rbac import RBAC

config = Config(filename=app.config["AQUARIUS_CONFIG_FILE"])
aquarius_url = config.aquarius_url


@app.before_request
def set_rbac_headers():
if os.getenv("RBAC_SERVER_URL"):
RBAC.set_headers(request)


def get_version():
conf = configparser.ConfigParser()
conf.read(".bumpversion.cfg")
Expand Down
49 changes: 48 additions & 1 deletion tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import json
import logging
import os
from requests.models import Response
from datetime import datetime
from unittest.mock import patch
from unittest.mock import patch, Mock

import pytest

Expand All @@ -16,6 +17,7 @@
datetime_converter,
get_bool_env_value,
sanitize_record,
sanitize_query_result,
get_aquarius_wallet,
AquariusPrivateKeyException,
get_signature_vrs,
Expand Down Expand Up @@ -112,6 +114,51 @@ def test_sanitize_record():
assert result["other_value"] == "something else"


def test_sanitize_record_through_rbac(monkeypatch):
monkeypatch.setenv("RBAC_SERVER_URL", "test")

with patch("requests.post") as mock:
response = Mock(spec=Response)
response.json.return_value = {"this_is": "SPARTAAA!"}
MantisClone marked this conversation as resolved.
Show resolved Hide resolved
response.status_code = 200
mock.return_value = response

result = sanitize_record({})
assert result["this_is"] == "SPARTAAA!"

with patch("requests.post") as mock:
response = Mock(spec=Response)
response.status_code = 404
mock.return_value = response

result = sanitize_record({"this_is": "something else"})
assert result["this_is"] == "something else"


def test_sanitize_query_result(monkeypatch):
result = sanitize_query_result({"this_is": "Athens, for some reason."})
assert result["this_is"] == "Athens, for some reason."

monkeypatch.setenv("RBAC_SERVER_URL", "test")

with patch("requests.post") as mock:
response = Mock(spec=Response)
response.json.return_value = {"this_is": "SPARTAAA!"}
response.status_code = 200
mock.return_value = response

result = sanitize_query_result({})
assert result["this_is"] == "SPARTAAA!"

with patch("requests.post") as mock:
response = Mock(spec=Response)
response.status_code = 404
mock.return_value = response

result = sanitize_query_result({"this_is": "something else"})
assert result["this_is"] == "something else"


class BlockProcessingClassChild(BlockProcessingClass):
def get_last_processed_block(self):
raise Exception("BAD!")
Expand Down