diff --git a/google/gax/api_callable.py b/google/gax/api_callable.py index c8ec7da..4599a31 100644 --- a/google/gax/api_callable.py +++ b/google/gax/api_callable.py @@ -117,7 +117,8 @@ def _bundleable(a_func, desc, bundler): """ def inner(request): """Schedules execution of a bundling task.""" - the_id = bundling.compute_bundle_id(request, desc.discriminator_fields) + the_id = bundling.compute_bundle_id( + request, desc.request_descriminator_fields) return bundler.schedule(a_func, the_id, desc, request) return inner @@ -157,55 +158,96 @@ def inner(request): return inner -class ApiCallDefaults(object): - """Encapsulates the default settings for all ApiCallables in an API""" +class CallSettings(object): + """Encapsulates the call settings for an ApiCallable""" # pylint: disable=too-few-public-methods - def __init__(self, timeout=30, is_idempotent_retrying=True, - max_attempts=16, bundler=None): + def __init__(self, timeout=30, is_retrying=False, max_attempts=16, + page_descriptor=None, bundler=None, bundle_descriptor=None): """Constructor. Args: timeout (int): The client-side timeout for API calls. - is_idempotent_retrying (bool): If set, calls determined by - configuration to be idempotent will retry upon transient error + is_retrying (bool): If set, calls will retry upon transient error by default. max_attempts (int): The maximum number of attempts that should be - made for a retrying call to this service. - bundler (bundle.Executor): (optional) orchestrates bundling + made for a retrying call to this service. This parameter is + ignored if ``is_retrying`` is not set. + page_descriptor (PageDescriptor): indicates the structure of page + streaming to be performed. If set to None, page streaming is + not performed. + bundler (bundle.Executor): orchestrates bundling. If None, bundling + is not performed. + bundle_descriptor (BundleDescriptor): indicates the structure of + the bundle. If None, bundling is not performed. Returns: - An ApiCallDefaults object. + A CallSettings object. """ self.timeout = timeout - self.is_idempotent_retrying = is_idempotent_retrying + self.is_retrying = is_retrying self.max_attempts = max_attempts + self.page_descriptor = page_descriptor self.bundler = bundler + self.bundle_descriptor = bundle_descriptor + + def merge(self, options): + """Returns a new CallSettings merged from this and a CallOptions object. + + Args: + options: A CallOptions object whose values are override those in + this object. If None, `merge` returns a copy of this object. + + Returns: + A CallSettings object. + """ + if not options: + return CallSettings( + timeout=self.timeout, is_retrying=self.is_retrying, + max_attempts=self.max_attempts, + page_descriptor=self.page_descriptor, + bundler=self.bundler, bundle_descriptor=self.bundle_descriptor) + else: + if options.timeout == OPTION_INHERIT: + timeout = self.timeout + else: + timeout = options.timeout + + if options.is_retrying == OPTION_INHERIT: + is_retrying = self.is_retrying + else: + is_retrying = options.is_retrying + + if options.max_attempts == OPTION_INHERIT: + max_attempts = self.max_attempts + else: + max_attempts = options.max_attempts + + if options.is_page_streaming: + page_descriptor = self.page_descriptor + else: + page_descriptor = None + + return CallSettings( + timeout=timeout, is_retrying=is_retrying, + max_attempts=max_attempts, page_descriptor=page_descriptor, + bundler=self.bundler, bundle_descriptor=self.bundle_descriptor) class CallOptions(object): - """Encapsulates the default settings for a particular API call""" + """Encapsulates the overridable settings for a particular API call""" # pylint: disable=too-few-public-methods def __init__(self, timeout=OPTION_INHERIT, is_retrying=OPTION_INHERIT, - max_attempts=OPTION_INHERIT, page_streaming=OPTION_INHERIT, - bundle_descriptor=OPTION_INHERIT, - bundle_options=OPTION_INHERIT): + max_attempts=OPTION_INHERIT, is_page_streaming=OPTION_INHERIT): """Constructor. Args: - timeout: The client-side timeout for API calls. - is_retrying: If set, call will retry upon transient error by + timeout (int): The client-side timeout for API calls. + is_retrying (bool): If set, call will retry upon transient error by default. - max_attempts: The maximum number of attempts that should be made - for a retrying call to this service. - page_streaming (PageDescriptor): indicates the structure of page - streaming to be performed. If set to None, no page streaming - is performed. - bundle_descriptor (BundleDescriptor): indicates the structure of - bundling to be performed. Both ``bundle_descriptor`` and - ``bundle_options`` must be set if bundling is to be performed. - bundle_options (BundleOptions): configures bundling thresholds. - Both ``bundle_descriptor`` and ``bundle options`` must be set - if bundling is to be performed. + max_attempts (int): The maximum number of attempts that should be + made for a retrying call to this service. + is_page_streaming (bool): If set and the call is + configured for page streaming, page streaming is performed. Returns: A CallOptions object. @@ -213,46 +255,7 @@ def __init__(self, timeout=OPTION_INHERIT, is_retrying=OPTION_INHERIT, self.timeout = timeout self.is_retrying = is_retrying self.max_attempts = max_attempts - self.page_streaming = page_streaming - self.bundle_descriptor = bundle_descriptor - self.bundle_options = bundle_options - - def update(self, options): - """Merges another CallOptions object into this one. - - Args: - options: A CallOptions object whose values are override those in - this object. If None, `update` has no effect. - """ - if not options: - return - if options.timeout != OPTION_INHERIT: - self.timeout = options.timeout - if options.is_retrying != OPTION_INHERIT: - self.is_retrying = options.is_retrying - if options.max_attempts != OPTION_INHERIT: - self.max_attempts = options.max_attempts - if options.page_streaming != OPTION_INHERIT: - self.page_streaming = options.page_streaming - if options.bundle_descriptor != OPTION_INHERIT: - self.bundle_descriptor = options.bundle_descriptor - if options.bundle_options != OPTION_INHERIT: - self.bundle_options = options.bundle_options - - def normalize(self): - """Transforms fields set to OPTION_INHERIT to None.""" - if self.timeout == OPTION_INHERIT: - self.timeout = None - if self.is_retrying == OPTION_INHERIT: - self.is_retrying = None - if self.max_attempts == OPTION_INHERIT: - self.max_attempts = None - if self.page_streaming == OPTION_INHERIT: - self.page_streaming = None - if self.bundle_descriptor == OPTION_INHERIT: - self.bundle_descriptor = None - if self.bundle_options == OPTION_INHERIT: - self.bundle_options = None + self.is_page_streaming = is_page_streaming class ApiCallable(object): @@ -262,63 +265,42 @@ class ApiCallable(object): Calling an object of ApiCallable type causes these calls to be transmitted. """ # pylint: disable=too-few-public-methods - def __init__(self, func, options=None, defaults=None, is_idempotent=False): + def __init__(self, func, settings): """Constructor. Args: func: The API call that this ApiCallable wraps. - options: A CallOptions object from which the settings for this call - are drawn. - defaults: A ApiCallDefaults object, from which default values - will be drawn if not supplied by `options`. The parameters in - `options` always override those in `defaults`. If the `timeout` - setting cannot be determined from either `options` or - `defaults`, a runtime error will result at call time. If the - `max_attempts` setting cannot be determined either from - `options` or `defaults`, a runtime error will result at call - time. - is_idempotent: If set, this call is marked as idempotent. Idempotent - calls' default retrying behavior may be set in `defaults`. + settings: A CallSettings object from which the settings for this + call are drawn. Returns: An ApiCallable object. """ - self.options = CallOptions() if options is None else options - self.defaults = defaults self.func = func - self.is_idempotent = is_idempotent - - def _call_settings(self): - self.options.normalize() - if self.is_idempotent and self.options.is_retrying is None: - is_retrying = self.defaults.is_idempotent_retrying - else: - is_retrying = self.options.is_retrying - page_descriptor = self.options.page_streaming - max_attempts = self.options.max_attempts - timeout = self.options.timeout - if self.defaults is not None: - if max_attempts is None: - max_attempts = self.defaults.max_attempts - if timeout is None: - timeout = self.defaults.timeout - return is_retrying, max_attempts, page_descriptor, timeout + self.settings = settings def __call__(self, *args, **kwargs): the_func = self.func - is_retrying, max_attempts, page_desc, timeout = self._call_settings() # Update the_func using each of the applicable function decorators # before calling. - if is_retrying: - the_func = _retryable(the_func, max_attempts) - if page_desc: + if self.settings.is_retrying: + the_func = _retryable(the_func, self.settings.max_attempts) + if self.settings.page_descriptor: + if self.settings.bundler and self.settings.bundle_descriptor: + raise ValueError('ApiCallable has incompatible settings: ' + 'bundling and page streaming') the_func = _page_streamable( the_func, - page_desc.request_page_token_field, - page_desc.response_page_token_field, - page_desc.resource_field, - timeout) + self.settings.page_descriptor.request_page_token_field, + self.settings.page_descriptor.response_page_token_field, + self.settings.page_descriptor.resource_field, + self.settings.timeout) else: - the_func = _add_timeout_arg(the_func, timeout) + the_func = _add_timeout_arg(the_func, self.settings.timeout) + if self.settings.bundler and self.settings.bundle_descriptor: + the_func = _bundleable( + the_func, self.settings.bundle_descriptor, + self.settings.bundler) + return the_func(*args, **kwargs) diff --git a/test/test_api_callable.py b/test/test_api_callable.py index d03af12..15c6724 100644 --- a/test/test_api_callable.py +++ b/test/test_api_callable.py @@ -34,7 +34,8 @@ import mock import unittest2 -from google.gax import api_callable, PageDescriptor +from google.gax import (api_callable, bundling, PageDescriptor, + BundleDescriptor, BundleOptions) from grpc.framework.interfaces.face import face _DUMMY_ERROR = face.AbortionError(None, None, None, None) @@ -43,7 +44,9 @@ class TestApiCallable(unittest2.TestCase): def test_call_api_callable(self): - my_callable = api_callable.ApiCallable(lambda _req, _timeout: 42) + settings = api_callable.CallSettings() + my_callable = api_callable.ApiCallable( + lambda _req, _timeout: 42, settings) self.assertEqual(my_callable(None), 42) def test_retry(self): @@ -54,9 +57,9 @@ def test_retry(self): mock_grpc.side_effect = ([_DUMMY_ERROR] * (to_attempt - 1) + [mock.DEFAULT]) mock_grpc.return_value = 1729 - options = api_callable.CallOptions( + settings = api_callable.CallSettings( timeout=0, is_retrying=True, max_attempts=to_attempt) - my_callable = api_callable.ApiCallable(mock_grpc, options=options) + my_callable = api_callable.ApiCallable(mock_grpc, settings) self.assertEqual(my_callable(None), 1729) self.assertEqual(mock_grpc.call_count, to_attempt) @@ -65,9 +68,9 @@ def test_retry_aborts(self): with mock.patch('grpc.framework.crust.implementations.' '_UnaryUnaryMultiCallable') as mock_grpc: mock_grpc.side_effect = _DUMMY_ERROR - options = api_callable.CallOptions( + settings = api_callable.CallSettings( timeout=0, is_retrying=True, max_attempts=to_attempt) - my_callable = api_callable.ApiCallable(mock_grpc, options=options) + my_callable = api_callable.ApiCallable(mock_grpc, settings) self.assertRaises(face.AbortionError, my_callable, None) self.assertEqual(mock_grpc.call_count, to_attempt) @@ -89,7 +92,7 @@ def __init__(self, nums=(), next_page_token=0): self.nums = nums self.next_page_token = next_page_token - mock_grpc_func_descriptor = PageDescriptor( + fake_grpc_func_descriptor = PageDescriptor( 'page_token', 'next_page_token', 'nums') def grpc_return_value(request, *dummy_args, **dummy_kwargs): @@ -108,82 +111,86 @@ def grpc_return_value(request, *dummy_args, **dummy_kwargs): with mock.patch('grpc.framework.crust.implementations.' '_UnaryUnaryMultiCallable') as mock_grpc: mock_grpc.side_effect = grpc_return_value - options = api_callable.CallOptions( - page_streaming=mock_grpc_func_descriptor, timeout=0) - my_callable = api_callable.ApiCallable(mock_grpc, options=options) + settings = api_callable.CallSettings( + page_descriptor=fake_grpc_func_descriptor, timeout=0) + my_callable = api_callable.ApiCallable(mock_grpc, settings=settings) self.assertEqual(list(my_callable(PageStreamingRequest())), list(range(page_size * pages_to_stream))) - def test_defaults_override_apicallable_defaults(self): - defaults = api_callable.ApiCallDefaults(timeout=10, max_attempts=6) - my_callable = api_callable.ApiCallable(None, defaults=defaults) - _, max_attempts, _, timeout = my_callable._call_settings() - self.assertEqual(timeout, 10) - self.assertEqual(max_attempts, 6) - - def test_constructor_values_override_defaults(self): - defaults = api_callable.ApiCallDefaults(timeout=10, max_attempts=6) - options = api_callable.CallOptions(timeout=100, max_attempts=60) - my_callable = api_callable.ApiCallable( - None, options=options, defaults=defaults) - _, max_attempts, _, timeout = my_callable._call_settings() - self.assertEqual(timeout, 100) - self.assertEqual(max_attempts, 60) - - def test_idempotent_default_retry(self): - defaults = api_callable.ApiCallDefaults( - is_idempotent_retrying=True) + def test_bundling_page_streaming_error(self): + settings = api_callable.CallSettings( + page_descriptor=object(), bundle_descriptor=object(), + bundler=object()) my_callable = api_callable.ApiCallable( - None, defaults=defaults, is_idempotent=True) - is_retrying, _, _, _ = my_callable._call_settings() - self.assertTrue(is_retrying) - - def test_idempotent_default_override(self): - defaults = api_callable.ApiCallDefaults( - is_idempotent_retrying=False) - options = api_callable.CallOptions(is_retrying=True) - my_callable = api_callable.ApiCallable( - None, options=options, defaults=defaults, is_idempotent=True) - is_retrying, _, _, _ = my_callable._call_settings() - self.assertTrue(is_retrying) + lambda _req, _timeout: 42, settings) + with self.assertRaises(ValueError): + my_callable(None) + + def test_bundling(self): + # pylint: disable=abstract-method, too-few-public-methods + class BundlingRequest(object): + def __init__(self, messages=None): + self.messages = messages + + fake_grpc_func_descriptor = BundleDescriptor('messages', []) + bundler = bundling.Executor(BundleOptions(message_count_threshold=8)) + + def my_func(request, dummy_timeout): + return len(request.messages) + + settings = api_callable.CallSettings( + bundler=bundler, bundle_descriptor=fake_grpc_func_descriptor, + timeout=0) + my_callable = api_callable.ApiCallable(my_func, settings) + first = my_callable(BundlingRequest([0] * 3)) + self.assertIsInstance(first, bundling.Event) + self.assertIsNone(first.result) # pylint: disable=no-member + second = my_callable(BundlingRequest([0] * 5)) + self.assertEquals(second.result, 8) # pylint: disable=no-member def test_call_options_simple(self): options = api_callable.CallOptions(timeout=23, is_retrying=True) self.assertEqual(options.timeout, 23) self.assertTrue(options.is_retrying) - self.assertEqual(options.page_streaming, api_callable.OPTION_INHERIT) + self.assertEqual(options.is_page_streaming, api_callable.OPTION_INHERIT) self.assertEqual(options.max_attempts, api_callable.OPTION_INHERIT) - def test_call_options_update(self): - first = api_callable.CallOptions(timeout=46, is_retrying=True) - second = api_callable.CallOptions( - timeout=9, page_streaming=False, max_attempts=16) - first.update(second) - self.assertEqual(first.timeout, 9) - self.assertTrue(first.is_retrying) - self.assertFalse(first.page_streaming) - self.assertEqual(first.max_attempts, 16) - - def test_call_options_update_none(self): - options = api_callable.CallOptions(timeout=23, page_streaming=False) - options.update(None) - self.assertEqual(options.timeout, 23) - self.assertEqual(options.is_retrying, api_callable.OPTION_INHERIT) - self.assertFalse(options.page_streaming) - self.assertEqual(options.max_attempts, api_callable.OPTION_INHERIT) - - def test_call_options_normalize(self): - options = api_callable.CallOptions(timeout=23, is_retrying=True) - options.normalize() - self.assertEqual(options.timeout, 23) - self.assertTrue(options.is_retrying) - self.assertIsNone(options.page_streaming) - self.assertIsNone(options.max_attempts) - - def test_call_options_normalize_default(self): - options = api_callable.CallOptions() - options.normalize() - self.assertIsNone(options.timeout) - self.assertIsNone(options.is_retrying) - self.assertIsNone(options.page_streaming) - self.assertIsNone(options.max_attempts) + def test_settings_merge_options1(self): + options = api_callable.CallOptions(timeout=46, is_retrying=True) + settings = api_callable.CallSettings( + timeout=9, page_descriptor=None, max_attempts=16) + final = settings.merge(options) + self.assertEqual(final.timeout, 46) + self.assertTrue(final.is_retrying) + self.assertIsNone(final.page_descriptor) + self.assertEqual(final.max_attempts, 16) + + def test_settings_merge_options2(self): + options = api_callable.CallOptions(max_attempts=1) + settings = api_callable.CallSettings( + timeout=9, page_descriptor=None, max_attempts=16) + final = settings.merge(options) + self.assertEqual(final.timeout, 9) + self.assertFalse(final.is_retrying) + self.assertIsNone(final.page_descriptor) + self.assertEqual(final.max_attempts, 1) + + def test_settings_merge_options_page_streaming(self): + options = api_callable.CallOptions(timeout=46, is_page_streaming=False) + settings = api_callable.CallSettings(timeout=9, max_attempts=16) + final = settings.merge(options) + self.assertEqual(final.timeout, 46) + self.assertFalse(final.is_retrying) + self.assertIsNone(final.page_descriptor) + self.assertEqual(final.max_attempts, 16) + + def test_settings_merge_none(self): + settings = api_callable.CallSettings( + timeout=23, page_descriptor=object(), bundler=object()) + final = settings.merge(None) + self.assertEqual(final.timeout, settings.timeout) + self.assertEqual(final.is_retrying, settings.is_retrying) + self.assertEqual(final.max_attempts, settings.max_attempts) + self.assertEqual(final.page_descriptor, settings.page_descriptor) + self.assertEqual(final.bundler, settings.bundler) + self.assertEqual(final.bundle_descriptor, settings.bundle_descriptor)