Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Enable LGTM checks and fix few LGTM alerts #302

Merged
merged 5 commits into from
Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
[![Build Status](https://travis-ci.org/zalando-incubator/kopf.svg?branch=master)](https://travis-ci.org/zalando-incubator/kopf)
[![codecov](https://codecov.io/gh/zalando-incubator/kopf/branch/master/graph/badge.svg)](https://codecov.io/gh/zalando-incubator/kopf)
[![Coverage Status](https://coveralls.io/repos/github/zalando-incubator/kopf/badge.svg?branch=master)](https://coveralls.io/github/zalando-incubator/kopf?branch=master)
[![Total alerts](https://img.shields.io/lgtm/alerts/g/zalando-incubator/kopf.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/zalando-incubator/kopf/alerts/)
[![Language grade: Python](https://img.shields.io/lgtm/grade/python/g/zalando-incubator/kopf.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/zalando-incubator/kopf/context:python)

**Kopf** —Kubernetes Operator Pythonic Framework— is a framework and a library
to make Kubernetes operators development easier, just in few lines of Python code.
Expand Down
2 changes: 1 addition & 1 deletion examples/04-events/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ def create_fn(body, **kwargs):

try:
raise RuntimeError("Exception text.")
except:
except Exception:
kopf.exception(body, reason="SomeReason", message="Some exception:")
3 changes: 1 addition & 2 deletions examples/13-hooks/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,4 @@ async def _task_fn(logger, shouldstop: asyncio.Event) -> NoReturn:
while not shouldstop.is_set():
await asyncio.sleep(random.randint(1, 10))
logger.info("Served by the background task.")
else:
logger.info("Serving is finished by request.")
logger.info("Serving is finished by request.")
8 changes: 3 additions & 5 deletions kopf/clients/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any:
await vault.invalidate(key, exc=e)
else:
raise
else:
raise credentials.LoginError("Ran out of connection credentials.")
raise credentials.LoginError("Ran out of connection credentials.")
return cast(_F, wrapper)


Expand Down Expand Up @@ -79,14 +78,13 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
async for item in fn(*args, **kwargs, context=context):
yield item
break # out of credentials cycle (instead of `return`)
return
except aiohttp.ClientResponseError as e:
if e.status == 401:
await vault.invalidate(key, exc=e)
else:
raise
else:
raise credentials.LoginError("Ran out of connection credentials.")
raise credentials.LoginError("Ran out of connection credentials.")
return cast(_F, wrapper)


Expand Down
46 changes: 23 additions & 23 deletions kopf/on.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def creation_handler(**kwargs):
ActivityHandlerDecorator = Callable[[callbacks.ActivityHandlerFn], callbacks.ActivityHandlerFn]


def startup(
def startup( # lgtm[py/similar-function]
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
Expand All @@ -36,8 +36,8 @@ def startup(
cooldown: Optional[float] = None, # deprecated, use `backoff`
registry: Optional[registries.OperatorRegistry] = None,
) -> ActivityHandlerDecorator:
actual_registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn: callbacks.ActivityHandlerFn) -> callbacks.ActivityHandlerFn:
actual_registry = registry if registry is not None else registries.get_default_registry()
return actual_registry.register_activity_handler(
fn=fn, id=id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
Expand All @@ -46,7 +46,7 @@ def decorator(fn: callbacks.ActivityHandlerFn) -> callbacks.ActivityHandlerFn:
return decorator


def cleanup(
def cleanup( # lgtm[py/similar-function]
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
Expand All @@ -56,8 +56,8 @@ def cleanup(
cooldown: Optional[float] = None, # deprecated, use `backoff`
registry: Optional[registries.OperatorRegistry] = None,
) -> ActivityHandlerDecorator:
actual_registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn: callbacks.ActivityHandlerFn) -> callbacks.ActivityHandlerFn:
actual_registry = registry if registry is not None else registries.get_default_registry()
return actual_registry.register_activity_handler(
fn=fn, id=id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
Expand All @@ -66,7 +66,7 @@ def decorator(fn: callbacks.ActivityHandlerFn) -> callbacks.ActivityHandlerFn:
return decorator


def login(
def login( # lgtm[py/similar-function]
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
Expand All @@ -77,8 +77,8 @@ def login(
registry: Optional[registries.OperatorRegistry] = None,
) -> ActivityHandlerDecorator:
""" ``@kopf.on.login()`` handler for custom (re-)authentication. """
actual_registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn: callbacks.ActivityHandlerFn) -> callbacks.ActivityHandlerFn:
actual_registry = registry if registry is not None else registries.get_default_registry()
return actual_registry.register_activity_handler(
fn=fn, id=id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
Expand All @@ -87,7 +87,7 @@ def decorator(fn: callbacks.ActivityHandlerFn) -> callbacks.ActivityHandlerFn:
return decorator


def probe(
def probe( # lgtm[py/similar-function]
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
Expand All @@ -98,8 +98,8 @@ def probe(
registry: Optional[registries.OperatorRegistry] = None,
) -> ActivityHandlerDecorator:
""" ``@kopf.on.probe()`` handler for arbitrary liveness metrics. """
actual_registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn: callbacks.ActivityHandlerFn) -> callbacks.ActivityHandlerFn:
actual_registry = registry if registry is not None else registries.get_default_registry()
return actual_registry.register_activity_handler(
fn=fn, id=id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
Expand All @@ -108,7 +108,7 @@ def decorator(fn: callbacks.ActivityHandlerFn) -> callbacks.ActivityHandlerFn:
return decorator


def resume(
def resume( # lgtm[py/similar-function]
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
Expand All @@ -124,8 +124,8 @@ def resume(
when: Optional[callbacks.WhenHandlerFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.resume()`` handler for the object resuming on operator (re)start. """
actual_registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
actual_registry = registry if registry is not None else registries.get_default_registry()
return actual_registry.register_resource_changing_handler(
group=group, version=version, plural=plural,
reason=None, initial=True, deleted=deleted, id=id,
Expand All @@ -135,7 +135,7 @@ def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
return decorator


def create(
def create( # lgtm[py/similar-function]
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
Expand All @@ -150,8 +150,8 @@ def create(
when: Optional[callbacks.WhenHandlerFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.create()`` handler for the object creation. """
actual_registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
actual_registry = registry if registry is not None else registries.get_default_registry()
return actual_registry.register_resource_changing_handler(
group=group, version=version, plural=plural,
reason=causation.Reason.CREATE, id=id,
Expand All @@ -161,7 +161,7 @@ def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
return decorator


def update(
def update( # lgtm[py/similar-function]
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
Expand All @@ -176,8 +176,8 @@ def update(
when: Optional[callbacks.WhenHandlerFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.update()`` handler for the object update or change. """
actual_registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
actual_registry = registry if registry is not None else registries.get_default_registry()
return actual_registry.register_resource_changing_handler(
group=group, version=version, plural=plural,
reason=causation.Reason.UPDATE, id=id,
Expand All @@ -187,7 +187,7 @@ def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
return decorator


def delete(
def delete( # lgtm[py/similar-function]
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
Expand All @@ -203,8 +203,8 @@ def delete(
when: Optional[callbacks.WhenHandlerFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.delete()`` handler for the object deletion. """
actual_registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
actual_registry = registry if registry is not None else registries.get_default_registry()
return actual_registry.register_resource_changing_handler(
group=group, version=version, plural=plural,
reason=causation.Reason.DELETE, id=id,
Expand All @@ -215,7 +215,7 @@ def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
return decorator


def field(
def field( # lgtm[py/similar-function]
group: str, version: str, plural: str,
field: dicts.FieldSpec,
*,
Expand All @@ -231,8 +231,8 @@ def field(
when: Optional[callbacks.WhenHandlerFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.field()`` handler for the individual field changes. """
actual_registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
actual_registry = registry if registry is not None else registries.get_default_registry()
return actual_registry.register_resource_changing_handler(
group=group, version=version, plural=plural,
reason=None, field=field, id=id,
Expand All @@ -242,7 +242,7 @@ def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
return decorator


def event(
def event( # lgtm[py/similar-function]
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
Expand All @@ -252,8 +252,8 @@ def event(
when: Optional[callbacks.WhenHandlerFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.event()`` handler for the silent spies on the events. """
actual_registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
actual_registry = registry if registry is not None else registries.get_default_registry()
return actual_registry.register_resource_watching_handler(
group=group, version=version, plural=plural,
id=id, fn=fn, labels=labels, annotations=annotations, when=when,
Expand All @@ -263,7 +263,7 @@ def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:

# TODO: find a better name: `@kopf.on.this` is confusing and does not fully
# TODO: match with the `@kopf.on.{cause}` pattern, where cause is create/update/delete.
def this(
def this( # lgtm[py/similar-function]
*,
id: Optional[str] = None,
errors: Optional[errors_.ErrorsMode] = None,
Expand Down Expand Up @@ -302,16 +302,16 @@ def create_task(*, spec, task=task, **kwargs):
Note: ``task=task`` is needed to freeze the closure variable, so that every
create function will have its own value, not the latest in the for-cycle.
"""
actual_registry = registry if registry is not None else handling.subregistry_var.get()
def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
actual_registry = registry if registry is not None else handling.subregistry_var.get()
return actual_registry.register(
id=id, fn=fn,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
)
return decorator


def register(
def register( # lgtm[py/similar-function]
fn: callbacks.ResourceHandlerFn,
*,
id: Optional[str] = None,
Expand Down
47 changes: 24 additions & 23 deletions kopf/reactor/running.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ def login(
for outcome in e.outcomes.values():
if isinstance(outcome.exception, credentials.LoginError):
raise outcome.exception
else:
raise
raise


def run(
Expand Down Expand Up @@ -309,32 +308,34 @@ async def run_tasks(
Only the tasks that existed before the operator startup are ignored
(for example, those that spawned the operator itself).
"""

# Run the infinite tasks until one of them fails/exits (they never exit normally).
# If the operator is cancelled, propagate the cancellation to all the sub-tasks.
# There is no graceful period: cancel as soon as possible, but allow them to finish.
try:
# Run the infinite tasks until one of them fails/exits (they never exit normally).
root_done, root_pending = await _wait(root_tasks, return_when=asyncio.FIRST_COMPLETED)
except asyncio.CancelledError:
# If the operator is cancelled, propagate the cancellation to all the sub-tasks.
# There is no graceful period: cancel as soon as possible, but allow them to finish.
root_cancelled, root_left = await _stop(root_tasks, title="Root", cancelled=True)
await _stop(root_tasks, title="Root", cancelled=True)
hung_tasks = await _all_tasks(ignored=ignored)
hung_cancelled, hung_left = await _stop(hung_tasks, title="Hung", cancelled=True)
await _stop(hung_tasks, title="Hung", cancelled=True)
raise
else:
# If the operator is intact, but one of the root tasks has exited (successfully or not),
# cancel all the remaining root tasks, and gracefully exit other spawned sub-tasks.
root_cancelled, root_left = await _stop(root_pending, title="Root", cancelled=False)
hung_tasks = await _all_tasks(ignored=ignored)
try:
# After the root tasks are all gone, cancel any spawned sub-tasks (e.g. handlers).
# TODO: assumption! the loop is not fully ours! find a way to cancel our spawned tasks.
hung_done, hung_pending = await _wait(hung_tasks, timeout=5.0)
except asyncio.CancelledError:
# If the operator is cancelled, propagate the cancellation to all the sub-tasks.
hung_cancelled, hung_left = await _stop(hung_tasks, title="Hung", cancelled=True)
raise
else:
# If the operator is intact, but the timeout is reached, forcely cancel the sub-tasks.
hung_cancelled, hung_left = await _stop(hung_pending, title="Hung", cancelled=False)

# If the operator is intact, but one of the root tasks has exited (successfully or not),
# cancel all the remaining root tasks, and gracefully exit other spawned sub-tasks.
root_cancelled, _ = await _stop(root_pending, title="Root", cancelled=False)

# After the root tasks are all gone, cancel any spawned sub-tasks (e.g. handlers).
# If the operator is cancelled, propagate the cancellation to all the sub-tasks.
# TODO: an assumption! the loop is not fully ours! find a way to cancel only our spawned tasks.
hung_tasks = await _all_tasks(ignored=ignored)
try:
hung_done, hung_pending = await _wait(hung_tasks, timeout=5.0)
except asyncio.CancelledError:
await _stop(hung_tasks, title="Hung", cancelled=True)
raise

# If the operator is intact, but the timeout is reached, forcely cancel the sub-tasks.
hung_cancelled, _ = await _stop(hung_pending, title="Hung", cancelled=False)

# If succeeded or if cancellation is silenced, re-raise from failed tasks (if any).
await _reraise(root_done | root_cancelled | hung_done | hung_cancelled)
Expand Down