From 6e4fb4b690f8dbe1447c75a4e062b905e7eabff7 Mon Sep 17 00:00:00 2001 From: Sergei Tsaplin Date: Mon, 28 Sep 2020 18:38:15 +0300 Subject: [PATCH] Password-based authentication support --- .gitignore | 3 + .travis.yml | 2 +- aioetcd3/auth.py | 7 +- aioetcd3/base.py | 69 ++++++++++++++- aioetcd3/client.py | 23 +++-- aioetcd3/cluster.py | 2 +- aioetcd3/exceptions.py | 41 +++++++++ aioetcd3/utils.py | 16 +++- aioetcd3/watch.py | 4 +- setup.py | 2 +- test/test_auth.py | 193 ++++++++++++++++++++++++++++++++++++++++- 11 files changed, 339 insertions(+), 23 deletions(-) create mode 100644 aioetcd3/exceptions.py diff --git a/.gitignore b/.gitignore index 7bbc71c..7ebd525 100644 --- a/.gitignore +++ b/.gitignore @@ -99,3 +99,6 @@ ENV/ # mypy .mypy_cache/ + +# idea +.idea/ \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index ea66a80..e2ee8e3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,8 +17,8 @@ install: - pip install -r requirements.txt - docker run -d -p 2379:2379 -p 2380:2380 --name etcd-v3.2 --volume=/tmp/etcd-data:/etcd-data gcr.io/etcd-development/etcd:v3.2 /usr/local/bin/etcd --name my-etcd-1 --data-dir /etcd-data --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2379 --listen-peer-urls http://0.0.0.0:2380 --initial-advertise-peer-urls http://0.0.0.0:2380 --initial-cluster my-etcd-1=http://0.0.0.0:2380 --initial-cluster-token 123456789 --initial-cluster-state new --auto-compaction-retention 1 - docker run -d -p 2378:2379 -p 2381:2380 --name etcd-v3.2-auth --volume=/tmp/etcd-data2:/etcd-data --volume=`pwd`/test/cfssl:/cfssl gcr.io/etcd-development/etcd:v3.2 /usr/local/bin/etcd --name my-etcd-1 --data-dir /etcd-data --listen-client-urls https://0.0.0.0:2379 --advertise-client-urls https://0.0.0.0:2378 --client-cert-auth --trusted-ca-file=/cfssl/ca.pem --cert-file=/cfssl/server.pem --key-file=/cfssl/server-key.pem + - docker run -d -p 2377:2379 -p 2382:2380 --name etcd-v3.2-ssl --volume=/tmp/etcd-data3:/etcd-data --volume=`pwd`/test/cfssl:/cfssl gcr.io/etcd-development/etcd:v3.2 /usr/local/bin/etcd --name my-etcd-2 --data-dir /etcd-data --listen-client-urls https://0.0.0.0:2379 --advertise-client-urls https://0.0.0.0:2377 --cert-file=/cfssl/server.pem --key-file=/cfssl/server-key.pem -script: script: - pwd - coverage run -m unittest discover -v diff --git a/aioetcd3/auth.py b/aioetcd3/auth.py index c35955c..6de64e7 100644 --- a/aioetcd3/auth.py +++ b/aioetcd3/auth.py @@ -7,12 +7,12 @@ import aioetcd3._etcdv3.rpc_pb2_grpc as stub -def call_grpc(request, response_func, method): +def call_grpc(request, response_func, method, skip_auth=False): def _f(f): @functools.wraps(f) async def call(self, *args, **kwargs): - r = await self.grpc_call(method(self), request(*args, **kwargs)) + r = await self.grpc_call(method(self), request(*args, **kwargs), skip_auth=skip_auth) return response_func(r) return call @@ -33,8 +33,9 @@ async def auth_enable(self): async def auth_disable(self): pass + # The method should be called without password authentication to avoid the infinite recursion @call_grpc(lambda username, password: rpc.AuthenticateRequest(name=username, password=password), - lambda r: r.token, lambda s: s._auth_stub.Authenticate) + lambda r: r.token, lambda s: s._auth_stub.Authenticate, skip_auth=True) async def authenticate(self, username, password): pass diff --git a/aioetcd3/base.py b/aioetcd3/base.py index 0ae8d0e..ff39846 100644 --- a/aioetcd3/base.py +++ b/aioetcd3/base.py @@ -1,12 +1,44 @@ +import asyncio +from grpc import ( + metadata_call_credentials, AuthMetadataPlugin, RpcError, StatusCode +) + +from .exceptions import AuthError, STATUS_MAP + + _default_timeout = object() + +class _EtcdTokenCallCredentials(AuthMetadataPlugin): + + def __init__(self, access_token): + self._access_token = access_token + + def __call__(self, context, callback): + metadata = (("token", self._access_token),) + callback(metadata, None) + + class StubMixin(object): - def __init__(self, channel, timeout): + def __init__(self, channel, timeout, username=None, password=None): + self.username = username + self.password = password self.channel = channel self.timeout = timeout + self._auth_lock = asyncio.Lock() self.last_response_info = None + self._metadata = None + self._call_credentials = None self._update_channel(channel) + async def _authenticate(self): + async with self._auth_lock: # Avoiding concurrent authentications for the client instance + if self._metadata is not None: # Avoiding double authentication + return + token = await self.authenticate(username=self.username, password=self.password) + self._metadata = (("token", token),) + self._call_credentials = metadata_call_credentials(_EtcdTokenCallCredentials(token)) + def _update_channel(self, channel): self.channel = channel self._loop = channel._loop @@ -17,9 +49,40 @@ def _update_cluster_info(self, header): def get_cluster_info(self): return self.last_response_info - async def grpc_call(self, stub_func, request, timeout=_default_timeout): + async def grpc_call(self, stub_func, request, timeout=_default_timeout, skip_auth=False): if timeout is _default_timeout: timeout = self.timeout - response = await stub_func(request, timeout=timeout) + + # If the username and password are set, trying to call the auth.authenticate + # method to get the auth token. If the token already received - just use it. + if self.username is not None and self.password is not None and not skip_auth: + if self._metadata is None: # We need to call self._authenticate for the first rpc call only + try: + await self._authenticate() + except RpcError as exc: + if exc._state.code == StatusCode.INVALID_ARGUMENT: + raise AuthError(exc._state.details, exc._state.debug_error_string) + raise exc + + try: + response = await stub_func( + request, timeout=timeout, credentials=self._call_credentials, metadata=self._metadata + ) + except RpcError as exc: + _process_rpc_error(exc) self._update_cluster_info(response.header) return response + + +def _process_rpc_error(exc: RpcError): + """Wraps grpc.RpcError to a specific library's exception. + If there is no specific exception found in the map, the original + exception will be raised + """ + try: + new_exc = STATUS_MAP.get(exc._state.code) + if new_exc is not None: + raise new_exc(exc._state.details, exc._state.debug_error_string) + except AttributeError: + pass + raise exc diff --git a/aioetcd3/client.py b/aioetcd3/client.py index 061cc2d..213ea04 100644 --- a/aioetcd3/client.py +++ b/aioetcd3/client.py @@ -1,5 +1,6 @@ import aiogrpc import os +import logging from aiogrpc.channel import Channel from aioetcd3.kv import KV from aioetcd3.lease import Lease @@ -9,11 +10,14 @@ from aioetcd3.cluster import Cluster from aioetcd3.utils import get_secure_creds +logger = logging.getLogger(__name__) + class Client(KV, Lease, Auth, Watch, Maintenance, Cluster): def __init__(self, endpoint, ssl=False, ca_cert=None, cert_key=None, cert_cert=None, - default_ca=False, grpc_options = None, timeout=5, + default_ca=False, grpc_options=None, timeout=5, + username=None, password=None, *, loop=None, executor=None): channel = self._create_grpc_channel(endpoint=endpoint, ssl=ssl, ca_cert=ca_cert, @@ -22,7 +26,9 @@ def __init__(self, endpoint, ssl=False, options=grpc_options, loop=loop, executor=executor) - super().__init__(channel, timeout) + if cert_key and cert_cert and username and password: + logger.warning("Certificate and password authentication methods are used simultaneously") + super().__init__(channel, timeout, username=username, password=password) def update_server_list(self, endpoint): self.close() @@ -41,6 +47,8 @@ def _create_grpc_channel(self, endpoint, ssl=False, ca_cert = None else: if ca_cert is None: + logger.warning("Certificate authority is not specified. Empty CA will be used. To use system CA set" + " `default_ca=True`") ca_cert = '' # to ensure ssl connect , set grpc env @@ -59,6 +67,8 @@ def _create_grpc_channel(self, endpoint, ssl=False, return channel def _recreate_grpc_channel(self, endpoint): + self._call_credentials = None + self._metadata = None if self._credentials: channel = aiogrpc.secure_channel(endpoint, self._credentials, options=self._options, loop=self._loop, executor=self._executor, @@ -72,17 +82,18 @@ def close(self): return self.channel.close() -def client(endpoint, grpc_options=None, timeout=None): +def client(endpoint, grpc_options=None, timeout=None, username=None, password=None): # user `ip:port,ip:port` to user grpc balance - return Client(endpoint, grpc_options=grpc_options, timeout=timeout) + return Client(endpoint, grpc_options=grpc_options, username=username, password=password, timeout=timeout) def ssl_client(endpoint, ca_file=None, cert_file=None, key_file=None, default_ca=False, grpc_options=None, - timeout=None): + timeout=None, username=None, password=None): ca, key, cert = get_secure_creds(ca_cert=ca_file, cert_cert=cert_file, cert_key=key_file) return Client(endpoint, ssl=True, ca_cert=ca, cert_key=key, cert_cert=cert, - default_ca=default_ca, grpc_options=grpc_options, timeout=timeout) + default_ca=default_ca, grpc_options=grpc_options, timeout=timeout, + username=username, password=password) def set_grpc_cipher(enable_rsa=True, enable_ecdsa=True, ciphers=None): diff --git a/aioetcd3/cluster.py b/aioetcd3/cluster.py index b9ab937..6a7315c 100644 --- a/aioetcd3/cluster.py +++ b/aioetcd3/cluster.py @@ -72,7 +72,7 @@ async def member_healthy(self, members=None): channel = aiogrpc.insecure_channel(server_endpoint, options=self._options, loop=self._loop, executor=self._executor, standalone_pool_for_streaming=True) try: - maintenance = Maintenance(channel=channel, timeout=2) + maintenance = Maintenance(channel=channel, timeout=2, username=self.username, password=self.password) try: await maintenance.status() except grpc.RpcError: diff --git a/aioetcd3/exceptions.py b/aioetcd3/exceptions.py new file mode 100644 index 0000000..bbff156 --- /dev/null +++ b/aioetcd3/exceptions.py @@ -0,0 +1,41 @@ +# All of the custom errors are inherited from the grpc.RpcError +# for the backward compatibility +from grpc import RpcError, StatusCode + + +class EtcdError(RpcError): + code = StatusCode.UNKNOWN + + def __init__(self, details, debug_info=None): + self.details = details + self.debug_info = debug_info + + def __repr__(self): + return "`{}`: reason: `{}`".format(self.code, self.details) + + +class AuthError(EtcdError): + code = StatusCode.INVALID_ARGUMENT + + +class Unauthenticated(EtcdError): + code = StatusCode.UNAUTHENTICATED + + +class InvalidArgument(EtcdError): + code = StatusCode.INVALID_ARGUMENT + + +class PermissionDenied(EtcdError): + code = StatusCode.PERMISSION_DENIED + + +class FailedPrecondition(EtcdError): + code = StatusCode.FAILED_PRECONDITION + + +STATUS_MAP = { + StatusCode.UNAUTHENTICATED: Unauthenticated, + StatusCode.PERMISSION_DENIED: PermissionDenied, + StatusCode.FAILED_PRECONDITION: FailedPrecondition, +} diff --git a/aioetcd3/utils.py b/aioetcd3/utils.py index 18e81d3..4a23cc4 100644 --- a/aioetcd3/utils.py +++ b/aioetcd3/utils.py @@ -66,8 +66,16 @@ def dns_endpoint(dns_name): def get_secure_creds(ca_cert, cert_key, cert_cert): - with open(ca_cert, 'rb') as ca_cert_file: + ca_cert_value = None + cert_key_value = None + cert_value = None + if ca_cert is not None: + with open(ca_cert, 'rb') as ca_cert_file: + ca_cert_value = ca_cert_file.read() + if cert_key is not None: with open(cert_key, 'rb') as cert_key_file: - with open(cert_cert, 'rb') as cert_cert_file: - return ca_cert_file.read(), cert_key_file.read(), cert_cert_file.read() - + cert_key_value = cert_key_file.read() + if cert_cert is not None: + with open(cert_cert, 'rb') as cert_cert_file: + cert_value = cert_cert_file.read() + return ca_cert_value, cert_key_value, cert_value diff --git a/aioetcd3/watch.py b/aioetcd3/watch.py index 5b698d9..599900b 100644 --- a/aioetcd3/watch.py +++ b/aioetcd3/watch.py @@ -188,14 +188,14 @@ def _clone(self): class Watch(StubMixin): - def __init__(self, channel, timeout): + def __init__(self, channel, timeout, username=None, password=None): # Put (WatchCreateRequest, output_queue, done_future) to create a watch self._create_request_queue = _Pipe(5, loop=self._loop) # Put (output_queue, done_future) to cancel a watch self._cancel_request_queue = _Pipe(loop=self._loop) self._reconnect_event = asyncio.Event(loop=self._loop) self._watch_task_running = None - super().__init__(channel, timeout) + super().__init__(channel, timeout, username=username, password=password) async def _watch_task(self, reconnect_event): # Queue for WatchRequest diff --git a/setup.py b/setup.py index bbf7088..3e2d174 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages -version = "1.11" +version = "1.12" try: import pypandoc diff --git a/test/test_auth.py b/test/test_auth.py index 239fb67..fdcca81 100644 --- a/test/test_auth.py +++ b/test/test_auth.py @@ -4,6 +4,7 @@ from aioetcd3.client import client, ssl_client, set_grpc_cipher from aioetcd3.help import range_all, PER_RW +from aioetcd3.exceptions import AuthError, Unauthenticated, PermissionDenied def asynctest(f): @@ -13,7 +14,9 @@ def _f(self): return _f + TEST_USER_NAME = 'test' +TEST_USER_PASSWORD = "test" TEST_ROLE_NAME = 'admin' @@ -46,7 +49,7 @@ async def test_auth_1(self): roles = await self.client.user_get(username=TEST_USER_NAME) self.assertEqual(len(roles), 0) - await self.client.user_change_password(username=TEST_USER_NAME, password="test") + await self.client.user_change_password(username=TEST_USER_NAME, password=TEST_USER_PASSWORD) await self.client.user_delete(username=TEST_USER_NAME) @@ -65,7 +68,7 @@ async def test_auth_2(self): @asynctest async def test_auth_3(self): - await self.client.user_add(username=TEST_USER_NAME, password="test") + await self.client.user_add(username=TEST_USER_NAME, password=TEST_USER_PASSWORD) with self.assertRaises(Exception): await self.client.user_grant_role(username=TEST_USER_NAME, role=TEST_ROLE_NAME) @@ -142,3 +145,189 @@ async def cleanUp(self): async def tearDown(self): await self.cleanUp() await self.client.close() + + +class PasswordAuthTest(unittest.TestCase): + @asynctest + async def setUp(self): + self.endpoints = "127.0.0.1:2379" + self.unauthenticated_client = client(endpoint=self.endpoints) + await self.cleanUp() + await self.prepare_users_and_roles() + self.client_client = client( + endpoint=self.endpoints, username="client", password="client" + ) + self.root_client = client(endpoint=self.endpoints, username="root", password="root") + + async def prepare_users_and_roles(self): + await self.unauthenticated_client.user_add(username="root", password="root") + await self.unauthenticated_client.role_add(name='root') + await self.unauthenticated_client.user_grant_role(username='root', role='root') + + await self.unauthenticated_client.user_add(username='client', password='client') + await self.unauthenticated_client.role_add(name='client') + await self.unauthenticated_client.user_grant_role(username='client', role='client') + await self.unauthenticated_client.auth_enable() + + async def create_kv_for_test(self): + await self.root_client.put('/foo', '/foo') + value, meta = await self.root_client.get('/foo') + self.assertEqual(value, b'/foo') + + @asynctest + async def test_auth_1(self): + await self.create_kv_for_test() + + with self.assertRaises(PermissionDenied): + await self.client_client.get('/foo') + + await self.root_client.role_grant_permission(name='client', key_range='/foo', permission=PER_RW) + value, meta = await self.client_client.get('/foo') + self.assertEqual(value, b'/foo') + + await self.client_client.put('/foo', 'ssss') + + @asynctest + async def test_wrong_password(self): + wrong_password_client = client(endpoint=self.endpoints, username="client", password="wrong_password") + with self.assertRaises(AuthError) as exc: + await wrong_password_client.get("/foo") + assert repr(exc.exception) == "`{}`: reason: `{}`".format(exc.exception.code, exc.exception.details) + + @asynctest + async def test_wrong_token(self): + await self.create_kv_for_test() + await self.root_client.role_grant_permission(name='client', key_range='/foo', permission=PER_RW) + + new_client = client(endpoint=self.endpoints, username="client", password="client") + value, meta = await self.client_client.get('/foo') + self.assertEqual(value, b'/foo') + + # Put invalid token + new_client._metadata = (("token", "invalid_token"),) + with self.assertRaises(Unauthenticated) as exc: + await new_client.get("/foo") + + async def delete_all_users(self): + # It's necessary to use unauthenticated client here, because root user + # cannot be deleted when the auth is enabled + users = await self.unauthenticated_client.user_list() + + for u in users: + await self.unauthenticated_client.user_delete(username=u) + + async def delete_all_roles(self): + # It's necessary to use unauthenticated client here, because root user + # cannot be deleted when the auth is enabled + roles = await self.unauthenticated_client.role_list() + + for r in roles: + await self.unauthenticated_client.role_delete(name=r) + + async def cleanUp(self): + await self.unauthenticated_client.delete(range_all()) + await self.delete_all_users() + await self.delete_all_roles() + + @asynctest + async def tearDown(self): + await self.root_client.auth_disable() + await self.cleanUp() + + +class PasswordAuthWithSslTest(unittest.TestCase): + @asynctest + async def setUp(self): + self.endpoints = "127.0.0.1:2377" + self.unauthenticated_client = ssl_client( + endpoint=self.endpoints, + ca_file="test/cfssl/ca.pem", + ) + await self.cleanUp() + await self.prepare_users_and_roles() + self.root_client = ssl_client(endpoint=self.endpoints, ca_file="test/cfssl/ca.pem", + username="root", password="root") + + self.client_client = ssl_client(endpoint=self.endpoints, ca_file="test/cfssl/ca.pem", + username="client", password="client") + + async def prepare_users_and_roles(self): + await self.unauthenticated_client.user_add(username="root", password="root") + await self.unauthenticated_client.role_add(name='root') + await self.unauthenticated_client.user_grant_role(username='root', role='root') + + await self.unauthenticated_client.user_add(username='client', password='client') + await self.unauthenticated_client.role_add(name='client') + await self.unauthenticated_client.user_grant_role(username='client', role='client') + await self.unauthenticated_client.auth_enable() + + async def create_kv_for_test(self): + await self.root_client.put('/foo', '/foo') + value, meta = await self.root_client.get('/foo') + self.assertEqual(value, b'/foo') + + @asynctest + async def test_auth_1(self): + await self.create_kv_for_test() + + with self.assertRaises(PermissionDenied): + await self.client_client.get('/foo') + + await self.root_client.role_grant_permission(name='client', key_range='/foo', permission=PER_RW) + value, meta = await self.client_client.get('/foo') + self.assertEqual(value, b'/foo') + + await self.client_client.put('/foo', 'ssss') + + @asynctest + async def test_wrong_password(self): + wrong_password_client = ssl_client( + endpoint=self.endpoints, ca_file="test/cfssl/ca.pem", + username="client", password="wrong_password" + ) + with self.assertRaises(AuthError) as exc: + await wrong_password_client.get("/foo") + assert repr(exc.exception) == "`{}`: reason: `{}`".format(exc.exception.code, exc.exception.details) + + @asynctest + async def test_wrong_token(self): + await self.create_kv_for_test() + await self.root_client.role_grant_permission(name='client', key_range='/foo', permission=PER_RW) + + new_client = ssl_client( + endpoint=self.endpoints, ca_file="test/cfssl/ca.pem", + username="root", password="root" + ) + value, meta = await new_client.get('/foo') + self.assertEqual(value, b'/foo') + + # Put invalid token + new_client._metadata = (("token", "invalid_token"),) + with self.assertRaises(Unauthenticated) as exc: + await new_client.get("/foo") + + async def delete_all_users(self): + # It's necessary to use unauthenticated client here, because root user + # cannot be deleted when the auth is enabled + users = await self.unauthenticated_client.user_list() + + for u in users: + await self.unauthenticated_client.user_delete(username=u) + + async def delete_all_roles(self): + # It's necessary to use unauthenticated client here, because root user + # cannot be deleted when the auth is enabled + roles = await self.unauthenticated_client.role_list() + + for r in roles: + await self.unauthenticated_client.role_delete(name=r) + + async def cleanUp(self): + await self.unauthenticated_client.delete(range_all()) + await self.delete_all_users() + await self.delete_all_roles() + + @asynctest + async def tearDown(self): + await self.root_client.auth_disable() + await self.cleanUp()