Skip to content

Commit

Permalink
Merge pull request #18 from SergeyTsaplin/password-auth
Browse files Browse the repository at this point in the history
Password-based authentication support
  • Loading branch information
gaopeiliang authored Sep 30, 2020
2 parents b17f8c9 + 6e4fb4b commit 3274ab4
Show file tree
Hide file tree
Showing 11 changed files with 339 additions and 23 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,6 @@ ENV/

# mypy
.mypy_cache/

# idea
.idea/
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ install:
- pip install -r requirements.txt
- docker run -d -p 2379:2379 -p 2380:2380 --name etcd-v3.2 --volume=/tmp/etcd-data:/etcd-data gcr.io/etcd-development/etcd:v3.2 /usr/local/bin/etcd --name my-etcd-1 --data-dir /etcd-data --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2379 --listen-peer-urls http://0.0.0.0:2380 --initial-advertise-peer-urls http://0.0.0.0:2380 --initial-cluster my-etcd-1=http://0.0.0.0:2380 --initial-cluster-token 123456789 --initial-cluster-state new --auto-compaction-retention 1
- docker run -d -p 2378:2379 -p 2381:2380 --name etcd-v3.2-auth --volume=/tmp/etcd-data2:/etcd-data --volume=`pwd`/test/cfssl:/cfssl gcr.io/etcd-development/etcd:v3.2 /usr/local/bin/etcd --name my-etcd-1 --data-dir /etcd-data --listen-client-urls https://0.0.0.0:2379 --advertise-client-urls https://0.0.0.0:2378 --client-cert-auth --trusted-ca-file=/cfssl/ca.pem --cert-file=/cfssl/server.pem --key-file=/cfssl/server-key.pem
- docker run -d -p 2377:2379 -p 2382:2380 --name etcd-v3.2-ssl --volume=/tmp/etcd-data3:/etcd-data --volume=`pwd`/test/cfssl:/cfssl gcr.io/etcd-development/etcd:v3.2 /usr/local/bin/etcd --name my-etcd-2 --data-dir /etcd-data --listen-client-urls https://0.0.0.0:2379 --advertise-client-urls https://0.0.0.0:2377 --cert-file=/cfssl/server.pem --key-file=/cfssl/server-key.pem

script:
script:
- pwd
- coverage run -m unittest discover -v
Expand Down
7 changes: 4 additions & 3 deletions aioetcd3/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import aioetcd3._etcdv3.rpc_pb2_grpc as stub


def call_grpc(request, response_func, method):
def call_grpc(request, response_func, method, skip_auth=False):

def _f(f):
@functools.wraps(f)
async def call(self, *args, **kwargs):
r = await self.grpc_call(method(self), request(*args, **kwargs))
r = await self.grpc_call(method(self), request(*args, **kwargs), skip_auth=skip_auth)
return response_func(r)

return call
Expand All @@ -33,8 +33,9 @@ async def auth_enable(self):
async def auth_disable(self):
pass

# The method should be called without password authentication to avoid the infinite recursion
@call_grpc(lambda username, password: rpc.AuthenticateRequest(name=username, password=password),
lambda r: r.token, lambda s: s._auth_stub.Authenticate)
lambda r: r.token, lambda s: s._auth_stub.Authenticate, skip_auth=True)
async def authenticate(self, username, password):
pass

Expand Down
69 changes: 66 additions & 3 deletions aioetcd3/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,44 @@
import asyncio
from grpc import (
metadata_call_credentials, AuthMetadataPlugin, RpcError, StatusCode
)

from .exceptions import AuthError, STATUS_MAP


_default_timeout = object()


class _EtcdTokenCallCredentials(AuthMetadataPlugin):

def __init__(self, access_token):
self._access_token = access_token

def __call__(self, context, callback):
metadata = (("token", self._access_token),)
callback(metadata, None)


class StubMixin(object):
def __init__(self, channel, timeout):
def __init__(self, channel, timeout, username=None, password=None):
self.username = username
self.password = password
self.channel = channel
self.timeout = timeout
self._auth_lock = asyncio.Lock()
self.last_response_info = None
self._metadata = None
self._call_credentials = None
self._update_channel(channel)

async def _authenticate(self):
async with self._auth_lock: # Avoiding concurrent authentications for the client instance
if self._metadata is not None: # Avoiding double authentication
return
token = await self.authenticate(username=self.username, password=self.password)
self._metadata = (("token", token),)
self._call_credentials = metadata_call_credentials(_EtcdTokenCallCredentials(token))

def _update_channel(self, channel):
self.channel = channel
self._loop = channel._loop
Expand All @@ -17,9 +49,40 @@ def _update_cluster_info(self, header):
def get_cluster_info(self):
return self.last_response_info

async def grpc_call(self, stub_func, request, timeout=_default_timeout):
async def grpc_call(self, stub_func, request, timeout=_default_timeout, skip_auth=False):
if timeout is _default_timeout:
timeout = self.timeout
response = await stub_func(request, timeout=timeout)

# If the username and password are set, trying to call the auth.authenticate
# method to get the auth token. If the token already received - just use it.
if self.username is not None and self.password is not None and not skip_auth:
if self._metadata is None: # We need to call self._authenticate for the first rpc call only
try:
await self._authenticate()
except RpcError as exc:
if exc._state.code == StatusCode.INVALID_ARGUMENT:
raise AuthError(exc._state.details, exc._state.debug_error_string)
raise exc

try:
response = await stub_func(
request, timeout=timeout, credentials=self._call_credentials, metadata=self._metadata
)
except RpcError as exc:
_process_rpc_error(exc)
self._update_cluster_info(response.header)
return response


def _process_rpc_error(exc: RpcError):
"""Wraps grpc.RpcError to a specific library's exception.
If there is no specific exception found in the map, the original
exception will be raised
"""
try:
new_exc = STATUS_MAP.get(exc._state.code)
if new_exc is not None:
raise new_exc(exc._state.details, exc._state.debug_error_string)
except AttributeError:
pass
raise exc
23 changes: 17 additions & 6 deletions aioetcd3/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import aiogrpc
import os
import logging
from aiogrpc.channel import Channel
from aioetcd3.kv import KV
from aioetcd3.lease import Lease
Expand All @@ -9,11 +10,14 @@
from aioetcd3.cluster import Cluster
from aioetcd3.utils import get_secure_creds

logger = logging.getLogger(__name__)


class Client(KV, Lease, Auth, Watch, Maintenance, Cluster):
def __init__(self, endpoint, ssl=False,
ca_cert=None, cert_key=None, cert_cert=None,
default_ca=False, grpc_options = None, timeout=5,
default_ca=False, grpc_options=None, timeout=5,
username=None, password=None,
*, loop=None, executor=None):
channel = self._create_grpc_channel(endpoint=endpoint, ssl=ssl,
ca_cert=ca_cert,
Expand All @@ -22,7 +26,9 @@ def __init__(self, endpoint, ssl=False,
options=grpc_options,
loop=loop,
executor=executor)
super().__init__(channel, timeout)
if cert_key and cert_cert and username and password:
logger.warning("Certificate and password authentication methods are used simultaneously")
super().__init__(channel, timeout, username=username, password=password)

def update_server_list(self, endpoint):
self.close()
Expand All @@ -41,6 +47,8 @@ def _create_grpc_channel(self, endpoint, ssl=False,
ca_cert = None
else:
if ca_cert is None:
logger.warning("Certificate authority is not specified. Empty CA will be used. To use system CA set"
" `default_ca=True`")
ca_cert = ''

# to ensure ssl connect , set grpc env
Expand All @@ -59,6 +67,8 @@ def _create_grpc_channel(self, endpoint, ssl=False,
return channel

def _recreate_grpc_channel(self, endpoint):
self._call_credentials = None
self._metadata = None
if self._credentials:
channel = aiogrpc.secure_channel(endpoint, self._credentials, options=self._options,
loop=self._loop, executor=self._executor,
Expand All @@ -72,17 +82,18 @@ def close(self):
return self.channel.close()


def client(endpoint, grpc_options=None, timeout=None):
def client(endpoint, grpc_options=None, timeout=None, username=None, password=None):

# user `ip:port,ip:port` to user grpc balance
return Client(endpoint, grpc_options=grpc_options, timeout=timeout)
return Client(endpoint, grpc_options=grpc_options, username=username, password=password, timeout=timeout)


def ssl_client(endpoint, ca_file=None, cert_file=None, key_file=None, default_ca=False, grpc_options=None,
timeout=None):
timeout=None, username=None, password=None):
ca, key, cert = get_secure_creds(ca_cert=ca_file, cert_cert=cert_file, cert_key=key_file)
return Client(endpoint, ssl=True, ca_cert=ca, cert_key=key, cert_cert=cert,
default_ca=default_ca, grpc_options=grpc_options, timeout=timeout)
default_ca=default_ca, grpc_options=grpc_options, timeout=timeout,
username=username, password=password)


def set_grpc_cipher(enable_rsa=True, enable_ecdsa=True, ciphers=None):
Expand Down
2 changes: 1 addition & 1 deletion aioetcd3/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async def member_healthy(self, members=None):
channel = aiogrpc.insecure_channel(server_endpoint, options=self._options, loop=self._loop,
executor=self._executor, standalone_pool_for_streaming=True)
try:
maintenance = Maintenance(channel=channel, timeout=2)
maintenance = Maintenance(channel=channel, timeout=2, username=self.username, password=self.password)
try:
await maintenance.status()
except grpc.RpcError:
Expand Down
41 changes: 41 additions & 0 deletions aioetcd3/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# All of the custom errors are inherited from the grpc.RpcError
# for the backward compatibility
from grpc import RpcError, StatusCode


class EtcdError(RpcError):
code = StatusCode.UNKNOWN

def __init__(self, details, debug_info=None):
self.details = details
self.debug_info = debug_info

def __repr__(self):
return "`{}`: reason: `{}`".format(self.code, self.details)


class AuthError(EtcdError):
code = StatusCode.INVALID_ARGUMENT


class Unauthenticated(EtcdError):
code = StatusCode.UNAUTHENTICATED


class InvalidArgument(EtcdError):
code = StatusCode.INVALID_ARGUMENT


class PermissionDenied(EtcdError):
code = StatusCode.PERMISSION_DENIED


class FailedPrecondition(EtcdError):
code = StatusCode.FAILED_PRECONDITION


STATUS_MAP = {
StatusCode.UNAUTHENTICATED: Unauthenticated,
StatusCode.PERMISSION_DENIED: PermissionDenied,
StatusCode.FAILED_PRECONDITION: FailedPrecondition,
}
16 changes: 12 additions & 4 deletions aioetcd3/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,16 @@ def dns_endpoint(dns_name):


def get_secure_creds(ca_cert, cert_key, cert_cert):
with open(ca_cert, 'rb') as ca_cert_file:
ca_cert_value = None
cert_key_value = None
cert_value = None
if ca_cert is not None:
with open(ca_cert, 'rb') as ca_cert_file:
ca_cert_value = ca_cert_file.read()
if cert_key is not None:
with open(cert_key, 'rb') as cert_key_file:
with open(cert_cert, 'rb') as cert_cert_file:
return ca_cert_file.read(), cert_key_file.read(), cert_cert_file.read()

cert_key_value = cert_key_file.read()
if cert_cert is not None:
with open(cert_cert, 'rb') as cert_cert_file:
cert_value = cert_cert_file.read()
return ca_cert_value, cert_key_value, cert_value
4 changes: 2 additions & 2 deletions aioetcd3/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,14 @@ def _clone(self):


class Watch(StubMixin):
def __init__(self, channel, timeout):
def __init__(self, channel, timeout, username=None, password=None):
# Put (WatchCreateRequest, output_queue, done_future) to create a watch
self._create_request_queue = _Pipe(5, loop=self._loop)
# Put (output_queue, done_future) to cancel a watch
self._cancel_request_queue = _Pipe(loop=self._loop)
self._reconnect_event = asyncio.Event(loop=self._loop)
self._watch_task_running = None
super().__init__(channel, timeout)
super().__init__(channel, timeout, username=username, password=password)

async def _watch_task(self, reconnect_event):
# Queue for WatchRequest
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from setuptools import setup, find_packages

version = "1.11"
version = "1.12"

try:
import pypandoc
Expand Down
Loading

0 comments on commit 3274ab4

Please sign in to comment.