Skip to content
This repository has been archived by the owner on Mar 20, 2018. It is now read-only.

Update to support bundling #28

Merged
merged 1 commit into from
Feb 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 91 additions & 109 deletions google/gax/api_callable.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ def _bundleable(a_func, desc, bundler):
"""
def inner(request):
"""Schedules execution of a bundling task."""
the_id = bundling.compute_bundle_id(request, desc.discriminator_fields)
the_id = bundling.compute_bundle_id(
request, desc.request_descriminator_fields)
return bundler.schedule(a_func, the_id, desc, request)

return inner
Expand Down Expand Up @@ -157,102 +158,104 @@ def inner(request):
return inner


class ApiCallDefaults(object):
"""Encapsulates the default settings for all ApiCallables in an API"""
class CallSettings(object):
"""Encapsulates the call settings for an ApiCallable"""
# pylint: disable=too-few-public-methods
def __init__(self, timeout=30, is_idempotent_retrying=True,
max_attempts=16, bundler=None):
def __init__(self, timeout=30, is_retrying=False, max_attempts=16,
page_descriptor=None, bundler=None, bundle_descriptor=None):
"""Constructor.

Args:
timeout (int): The client-side timeout for API calls.
is_idempotent_retrying (bool): If set, calls determined by
configuration to be idempotent will retry upon transient error
is_retrying (bool): If set, calls will retry upon transient error
by default.
max_attempts (int): The maximum number of attempts that should be
made for a retrying call to this service.
bundler (bundle.Executor): (optional) orchestrates bundling
made for a retrying call to this service. This parameter is
ignored if ``is_retrying`` is not set.
page_descriptor (PageDescriptor): indicates the structure of page
streaming to be performed. If set to None, page streaming is
not performed.
bundler (bundle.Executor): orchestrates bundling. If None, bundling
is not performed.
bundle_descriptor (BundleDescriptor): indicates the structure of
the bundle. If None, bundling is not performed.

Returns:
An ApiCallDefaults object.
A CallSettings object.
"""
self.timeout = timeout
self.is_idempotent_retrying = is_idempotent_retrying
self.is_retrying = is_retrying
self.max_attempts = max_attempts
self.page_descriptor = page_descriptor
self.bundler = bundler
self.bundle_descriptor = bundle_descriptor

def merge(self, options):
"""Returns a new CallSettings merged from this and a CallOptions object.

Args:
options: A CallOptions object whose values are override those in
this object. If None, `merge` returns a copy of this object.

Returns:
A CallSettings object.
"""
if not options:
return CallSettings(
timeout=self.timeout, is_retrying=self.is_retrying,
max_attempts=self.max_attempts,
page_descriptor=self.page_descriptor,
bundler=self.bundler, bundle_descriptor=self.bundle_descriptor)
else:
if options.timeout == OPTION_INHERIT:
timeout = self.timeout
else:
timeout = options.timeout

if options.is_retrying == OPTION_INHERIT:
is_retrying = self.is_retrying
else:
is_retrying = options.is_retrying

if options.max_attempts == OPTION_INHERIT:
max_attempts = self.max_attempts
else:
max_attempts = options.max_attempts

if options.is_page_streaming:
page_descriptor = self.page_descriptor
else:
page_descriptor = None

return CallSettings(
timeout=timeout, is_retrying=is_retrying,
max_attempts=max_attempts, page_descriptor=page_descriptor,
bundler=self.bundler, bundle_descriptor=self.bundle_descriptor)


class CallOptions(object):
"""Encapsulates the default settings for a particular API call"""
"""Encapsulates the overridable settings for a particular API call"""
# pylint: disable=too-few-public-methods
def __init__(self, timeout=OPTION_INHERIT, is_retrying=OPTION_INHERIT,
max_attempts=OPTION_INHERIT, page_streaming=OPTION_INHERIT,
bundle_descriptor=OPTION_INHERIT,
bundle_options=OPTION_INHERIT):
max_attempts=OPTION_INHERIT, is_page_streaming=OPTION_INHERIT):
"""Constructor.

Args:
timeout: The client-side timeout for API calls.
is_retrying: If set, call will retry upon transient error by
timeout (int): The client-side timeout for API calls.
is_retrying (bool): If set, call will retry upon transient error by
default.
max_attempts: The maximum number of attempts that should be made
for a retrying call to this service.
page_streaming (PageDescriptor): indicates the structure of page
streaming to be performed. If set to None, no page streaming
is performed.
bundle_descriptor (BundleDescriptor): indicates the structure of
bundling to be performed. Both ``bundle_descriptor`` and
``bundle_options`` must be set if bundling is to be performed.
bundle_options (BundleOptions): configures bundling thresholds.
Both ``bundle_descriptor`` and ``bundle options`` must be set
if bundling is to be performed.
max_attempts (int): The maximum number of attempts that should be
made for a retrying call to this service.
is_page_streaming (bool): If set and the call is
configured for page streaming, page streaming is performed.

Returns:
A CallOptions object.
"""
self.timeout = timeout
self.is_retrying = is_retrying
self.max_attempts = max_attempts
self.page_streaming = page_streaming
self.bundle_descriptor = bundle_descriptor
self.bundle_options = bundle_options

def update(self, options):
"""Merges another CallOptions object into this one.

Args:
options: A CallOptions object whose values are override those in
this object. If None, `update` has no effect.
"""
if not options:
return
if options.timeout != OPTION_INHERIT:
self.timeout = options.timeout
if options.is_retrying != OPTION_INHERIT:
self.is_retrying = options.is_retrying
if options.max_attempts != OPTION_INHERIT:
self.max_attempts = options.max_attempts
if options.page_streaming != OPTION_INHERIT:
self.page_streaming = options.page_streaming
if options.bundle_descriptor != OPTION_INHERIT:
self.bundle_descriptor = options.bundle_descriptor
if options.bundle_options != OPTION_INHERIT:
self.bundle_options = options.bundle_options

def normalize(self):
"""Transforms fields set to OPTION_INHERIT to None."""
if self.timeout == OPTION_INHERIT:
self.timeout = None
if self.is_retrying == OPTION_INHERIT:
self.is_retrying = None
if self.max_attempts == OPTION_INHERIT:
self.max_attempts = None
if self.page_streaming == OPTION_INHERIT:
self.page_streaming = None
if self.bundle_descriptor == OPTION_INHERIT:
self.bundle_descriptor = None
if self.bundle_options == OPTION_INHERIT:
self.bundle_options = None
self.is_page_streaming = is_page_streaming


class ApiCallable(object):
Expand All @@ -262,63 +265,42 @@ class ApiCallable(object):
Calling an object of ApiCallable type causes these calls to be transmitted.
"""
# pylint: disable=too-few-public-methods
def __init__(self, func, options=None, defaults=None, is_idempotent=False):
def __init__(self, func, settings):
"""Constructor.

Args:
func: The API call that this ApiCallable wraps.
options: A CallOptions object from which the settings for this call
are drawn.
defaults: A ApiCallDefaults object, from which default values
will be drawn if not supplied by `options`. The parameters in
`options` always override those in `defaults`. If the `timeout`
setting cannot be determined from either `options` or
`defaults`, a runtime error will result at call time. If the
`max_attempts` setting cannot be determined either from
`options` or `defaults`, a runtime error will result at call
time.
is_idempotent: If set, this call is marked as idempotent. Idempotent
calls' default retrying behavior may be set in `defaults`.
settings: A CallSettings object from which the settings for this
call are drawn.

Returns:
An ApiCallable object.
"""
self.options = CallOptions() if options is None else options
self.defaults = defaults
self.func = func
self.is_idempotent = is_idempotent

def _call_settings(self):
self.options.normalize()
if self.is_idempotent and self.options.is_retrying is None:
is_retrying = self.defaults.is_idempotent_retrying
else:
is_retrying = self.options.is_retrying
page_descriptor = self.options.page_streaming
max_attempts = self.options.max_attempts
timeout = self.options.timeout
if self.defaults is not None:
if max_attempts is None:
max_attempts = self.defaults.max_attempts
if timeout is None:
timeout = self.defaults.timeout
return is_retrying, max_attempts, page_descriptor, timeout
self.settings = settings

def __call__(self, *args, **kwargs):
the_func = self.func
is_retrying, max_attempts, page_desc, timeout = self._call_settings()

# Update the_func using each of the applicable function decorators
# before calling.
if is_retrying:
the_func = _retryable(the_func, max_attempts)
if page_desc:
if self.settings.is_retrying:
the_func = _retryable(the_func, self.settings.max_attempts)
if self.settings.page_descriptor:
if self.settings.bundler and self.settings.bundle_descriptor:
raise ValueError('ApiCallable has incompatible settings: '
'bundling and page streaming')
the_func = _page_streamable(
the_func,
page_desc.request_page_token_field,
page_desc.response_page_token_field,
page_desc.resource_field,
timeout)
self.settings.page_descriptor.request_page_token_field,
self.settings.page_descriptor.response_page_token_field,
self.settings.page_descriptor.resource_field,
self.settings.timeout)
else:
the_func = _add_timeout_arg(the_func, timeout)
the_func = _add_timeout_arg(the_func, self.settings.timeout)
if self.settings.bundler and self.settings.bundle_descriptor:
the_func = _bundleable(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

N.B: At this point, _bundleable is called with the_func which is still a callable([[request], response])
I refer to this in another comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We start out with (if you'll forgive some haskell-ish notation):
the_func :: (RequestType, int) -> (ResponseType)
_add_timeout_arg :: ((type_1, ..., type_n, int) -> X), int) -> (type_1, ..., type_n) -> X

hence,
_add_timeout_arg(the_func) :: RequestType -> ResponseType

So I think in test_api_callable, my_func must take a dummy_timeout, or else we can't meaningfully apply _add_timeout_arg to it in ApiCallable. Does that sounds right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense, LGTM

the_func, self.settings.bundle_descriptor,
self.settings.bundler)

return the_func(*args, **kwargs)
Loading