Skip to content

Commit

Permalink
Merge branch 'qds-sdk-allocation-strategy' of github.com:kswap/qds-sd…
Browse files Browse the repository at this point in the history
…k-py into qds-sdk-allocation-strategy
  • Loading branch information
Swapnil Kumar committed Feb 19, 2020
2 parents 44b96d4 + ed831b7 commit 1e885e9
Show file tree
Hide file tree
Showing 11 changed files with 1,082 additions and 56 deletions.
36 changes: 33 additions & 3 deletions bin/qds.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from qds_sdk.template import TemplateCmdLine
from qds_sdk.clusterv2 import ClusterCmdLine
from qds_sdk.sensors import *
from qds_sdk.quest import QuestCmdLine
import os
import sys
import traceback
Expand Down Expand Up @@ -88,6 +89,8 @@
" action --help\n"
"\nScheduler subcommand:\n"
" scheduler --help\n"
"\nQuest subcommand:\n"
" quest --help\n"
"\nTemplate subcommand:\n"
" template --help\n"
"\nAccount subcommand:\n"
Expand Down Expand Up @@ -554,6 +557,10 @@ def templatemain(args):
result = TemplateCmdLine.run(args)
print(result)

def questmain(args):
result = QuestCmdLine.run(args)
print(result)


def main():
optparser = OptionParser(usage=usage_str)
Expand Down Expand Up @@ -582,6 +589,18 @@ def main():
default=os.getenv('CLOUD_PROVIDER'),
help="cloud", choices=["AWS", "AZURE", "ORACLE_BMC", "ORACLE_OPC", "GCP"])

optparser.add_option("--base_retry_delay", dest="base_retry_delay",
type=int,
default=os.getenv('QDS_BASE_RETRY_DELAY'),
help="base sleep interval for exponential backoff in case of "
"retryable exceptions.Defaults to 10s.")

optparser.add_option("--max_retries", dest="max_retries",
type=int,
default=os.getenv('QDS_MAX_RETRIES'),
help="Number of re-attempts for an api-call in case of "
" retryable exceptions. Defaults to 5.")

optparser.add_option("-v", dest="verbose", action="store_true",
default=False,
help="verbose mode - info level logging")
Expand All @@ -592,7 +611,7 @@ def main():

optparser.disable_interspersed_args()
(options, args) = optparser.parse_args()

if options.chatty:
logging.basicConfig(level=logging.DEBUG)
elif options.verbose:
Expand All @@ -613,6 +632,12 @@ def main():
if options.poll_interval is None:
options.poll_interval = 5

if options.max_retries is None:
options.max_retries = 5

if options.base_retry_delay is None:
options.base_retry_delay = 10

if options.cloud_name is None:
options.cloud_name = "AWS"

Expand All @@ -626,7 +651,10 @@ def main():
version=options.api_version,
poll_interval=options.poll_interval,
skip_ssl_cert_check=options.skip_ssl_cert_check,
cloud_name=options.cloud_name)
cloud_name=options.cloud_name,
base_retry_delay=options.base_retry_delay,
max_retries=options.max_retries
)

if len(args) < 1:
sys.stderr.write("Missing first argument containing subcommand\n")
Expand Down Expand Up @@ -677,11 +705,13 @@ def main():
return usermain(args)
if a0 == "template":
return templatemain(args)
if a0 == "quest":
return questmain(args)

cmdset = set(CommandClasses.keys())
sys.stderr.write("First command must be one of <%s>\n" %
"|".join(cmdset.union(["cluster", "action", "scheduler", "report",
"dbtap", "role", "group", "app", "account", "nezha", "user", "template"])))
"dbtap", "role", "group", "app", "account", "nezha", "user", "template", "quest"])))
usage(optparser)


Expand Down
6 changes: 6 additions & 0 deletions qds_sdk/clusterv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def set_cluster_info_from_arguments(self, arguments):
disable_autoscale_node_pause=arguments.disable_autoscale_node_pause,
paused_autoscale_node_timeout_mins=arguments.paused_autoscale_node_timeout_mins,
parent_cluster_id=arguments.parent_cluster_id,
parent_cluster_label=arguments.parent_cluster_label,
image_version=arguments.image_version)

def set_cluster_info(self,
Expand Down Expand Up @@ -227,6 +228,7 @@ def set_cluster_info(self,
disable_autoscale_node_pause=None,
paused_autoscale_node_timeout_mins=None,
parent_cluster_id=None,
parent_cluster_label=None,
image_version=None):
"""
Args:
Expand Down Expand Up @@ -366,6 +368,7 @@ def set_cluster_info(self,
self.cluster_info['rootdisk'] = {}
self.cluster_info['rootdisk']['size'] = root_disk_size
self.cluster_info['parent_cluster_id'] = parent_cluster_id
self.cluster_info['parent_cluster_label'] = parent_cluster_label
self.cluster_info['cluster_image_version'] = image_version

self.set_spot_instance_settings(maximum_bid_price_percentage, timeout_for_request,
Expand Down Expand Up @@ -529,6 +532,9 @@ def cluster_info_parser(argparser, action):
dest="parent_cluster_id",
type=int,
help="Id of the parent cluster this hs2 cluster is attached to")
cluster_info.add_argument("--parent-cluster-label",
dest="parent_cluster_label",
help="Label of the parent cluster this hs2 cluster is attached to")
cluster_info.add_argument("--image-version",
dest="image_version",
help="cluster image version")
Expand Down
50 changes: 41 additions & 9 deletions qds_sdk/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
import logging
import ssl
import json
import time
import pkg_resources
from requests.adapters import HTTPAdapter
from datetime import datetime
try:
from requests.packages.urllib3.poolmanager import PoolManager
except ImportError:
from urllib3.poolmanager import PoolManager
from qds_sdk.retry import retry
from qds_sdk.exception import *
from functools import wraps


log = logging.getLogger("qds_connection")
Expand All @@ -34,14 +35,18 @@ def init_poolmanager(self, connections, maxsize,block=False):

class Connection:

def __init__(self, auth, rest_url, skip_ssl_cert_check, reuse=True):
def __init__(self, auth, rest_url, skip_ssl_cert_check,
reuse=True, max_retries=5,
base_retry_delay=10):
self.auth = auth
self.rest_url = rest_url
self.skip_ssl_cert_check = skip_ssl_cert_check
self._headers = {'User-Agent': 'qds-sdk-py-%s' % pkg_resources.get_distribution("qds-sdk").version,
'Content-Type': 'application/json'}

self.reuse = reuse
self.max_retries = max_retries
self.base_retry_delay = base_retry_delay
if reuse:
self.session = requests.Session()
self.session.mount('https://', MyAdapter())
Expand All @@ -50,20 +55,46 @@ def __init__(self, auth, rest_url, skip_ssl_cert_check, reuse=True):
self.session_with_retries = requests.Session()
self.session_with_retries.mount('https://', MyAdapter(max_retries=3))

@retry((RetryWithDelay, requests.Timeout, ServerError), tries=6, delay=30, backoff=2)
def retry(ExceptionToCheck, tries=5, delay=10, backoff=2):
def deco_retry(f):
@wraps(f)
def f_retry(self, *args, **kwargs):
if hasattr(self, 'max_retries'):
mtries, mdelay = self.max_retries, self.base_retry_delay
else:
mtries, mdelay = tries, delay
while mtries >= 1:
try:
return f(self, *args, **kwargs)
except ExceptionToCheck as e:
logger = logging.getLogger("retry")
msg = "%s, Retrying in %d seconds..." % (e.__class__.__name__,
mdelay)
logger.info(msg)
time.sleep(mdelay)
mtries -= 1
mdelay *= backoff
return f(self, *args, **kwargs)
return f_retry # true decorator
return deco_retry

@retry((RetryWithDelay, requests.Timeout, ServerError, ApiThrottledRetry))
def get_raw(self, path, params=None):
return self._api_call_raw("GET", path, params=params)

@retry((RetryWithDelay, requests.Timeout, ServerError), tries=6, delay=30, backoff=2)
@retry((RetryWithDelay, requests.Timeout, ServerError, ApiThrottledRetry))
def get(self, path, params=None):
return self._api_call("GET", path, params=params)

@retry(ApiThrottledRetry)
def put(self, path, data=None):
return self._api_call("PUT", path, data)

@retry(ApiThrottledRetry)
def post(self, path, data=None):
return self._api_call("POST", path, data)

@retry(ApiThrottledRetry)
def delete(self, path, data=None):
return self._api_call("DELETE", path, data)

Expand Down Expand Up @@ -111,10 +142,8 @@ def _api_call(self, req_type, path, data=None, params=None):
@staticmethod
def _handle_error(response):
"""Raise exceptions in response to any http errors
Args:
response: A Response object
Raises:
BadRequest: if HTTP error code 400 returned.
UnauthorizedAccess: if HTTP error code 401 returned.
Expand All @@ -134,8 +163,8 @@ def _handle_error(response):

if 'X-Qubole-Trace-Id' in response.headers:
now = datetime.now()
time = now.strftime('%Y-%m-%d %H:%M:%S')
format_list = [time,response.headers['X-Qubole-Trace-Id']]
time_now = now.strftime('%Y-%m-%d %H:%M:%S')
format_list = [time_now, response.headers['X-Qubole-Trace-Id']]
sys.stderr.write("[{}] Request ID is: {}. Please share it with Qubole Support team for any assistance".format(*format_list) + "\n")

if code == 400:
Expand Down Expand Up @@ -164,7 +193,10 @@ def _handle_error(response):
raise RetryWithDelay(response)
elif code == 449:
sys.stderr.write(response.text + "\n")
raise RetryWithDelay(response, "Data requested is unavailable. Retrying ...")
raise RetryWithDelay(response, "Data requested is unavailable. Retrying...")
elif code == 429:
sys.stderr.write(response.text + "\n")
raise ApiThrottledRetry(response, "Too many requests. Retrying...")
elif 401 <= code < 500:
sys.stderr.write(response.text + "\n")
raise ClientError(response)
Expand Down
6 changes: 6 additions & 0 deletions qds_sdk/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,9 @@ class MethodNotAllowed(ClientError):
"""An error raised when a method is not allowed."""
# 405 Method Not Allowed
pass


class ApiThrottledRetry(ClientError):
"""An error raised when upstream requests are throttled."""
# 429 Too Many Requests
pass
51 changes: 36 additions & 15 deletions qds_sdk/qubole.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class Qubole:
"""

MIN_POLL_INTERVAL = 1
RETRIES_CAP = 5
MAX_RETRY_DELAY = 10

_auth = None
api_token = None
Expand All @@ -31,22 +33,24 @@ class Qubole:
cloud_name = None
cached_agent = None
cloud = None
base_retry_delay = None
max_retries = None

@classmethod
def configure(cls, api_token,
api_url="https://api.qubole.com/api/", version="v1.2",
poll_interval=5, skip_ssl_cert_check=False, cloud_name="AWS"):
poll_interval=5, skip_ssl_cert_check=False, cloud_name="AWS",
base_retry_delay=10, max_retries=5):
"""
Set parameters governing interaction with QDS
Args:
`api_token`: authorization token for QDS. required
`api_url`: the base URL for QDS API. configurable for testing only
`version`: QDS REST api version. Will be used throughout unless overridden in Qubole.agent(..)
`poll_interval`: interval in secs when polling QDS for events
`delay` : interval in secs to sleep in between successive retries
`retries` : maximum number of time to retry an api call in case
of retryable exception.
"""

cls._auth = QuboleAuth(api_token)
Expand All @@ -61,34 +65,52 @@ def configure(cls, api_token,
cls.skip_ssl_cert_check = skip_ssl_cert_check
cls.cloud_name = cloud_name.lower()
cls.cached_agent = None

if base_retry_delay > Qubole.MAX_RETRY_DELAY:
log.warn("Sleep between successive retries cannot be greater than"
" %s seconds."
" Setting it to"
" %s seconds.\n"
% (Qubole.MAX_RETRY_DELAY, Qubole.MAX_RETRY_DELAY))
cls.base_retry_delay = Qubole.MAX_RETRY_DELAY
else:
cls.base_retry_delay = base_retry_delay
if max_retries > Qubole.RETRIES_CAP:
log.warn("Maximum retries cannot be greater than %s."
" Setting it to"
" default - %s.\n" % (Qubole.RETRIES_CAP, Qubole.RETRIES_CAP))
cls.max_retries = Qubole.RETRIES_CAP
else:
cls.max_retries = max_retries

@classmethod
def agent(cls, version=None):
"""
Returns:
a connection object to make REST calls to QDS
optionally override the `version` of the REST endpoint for advanced
features available only in the newer version of the API available
for certain resource end points eg: /v1.3/cluster. When version is
None we default to v1.2
"""
reuse_cached_agent = True
if version:
log.debug("api version changed to %s" % version)
cls.rest_url = '/'.join([cls.baseurl.rstrip('/'), version])
reuse_cached_agent = False
log.debug("api version changed to %s" % version)
cls.rest_url = '/'.join([cls.baseurl.rstrip('/'), version])
reuse_cached_agent = False
else:
cls.rest_url = '/'.join([cls.baseurl.rstrip('/'), cls.version])
cls.rest_url = '/'.join([cls.baseurl.rstrip('/'), cls.version])
if cls.api_token is None:
raise ConfigError("No API Token specified - please supply one via Qubole.configure()")

if not reuse_cached_agent:
uncached_agent = Connection(cls._auth, cls.rest_url, cls.skip_ssl_cert_check)
return uncached_agent
uncached_agent = Connection(cls._auth, cls.rest_url,
cls.skip_ssl_cert_check,
True, cls.max_retries, cls.base_retry_delay)
return uncached_agent
if cls.cached_agent is None:
cls.cached_agent = Connection(cls._auth, cls.rest_url, cls.skip_ssl_cert_check)
cls.cached_agent = Connection(cls._auth, cls.rest_url,
cls.skip_ssl_cert_check,
True, cls.max_retries, cls.base_retry_delay)

return cls.cached_agent

Expand Down Expand Up @@ -121,4 +143,3 @@ def get_cloud_object(cls, cloud_name):
elif cloud_name.lower() == "gcp":
import qds_sdk.cloud.gcp_cloud
return qds_sdk.cloud.gcp_cloud.GcpCloud()

Loading

0 comments on commit 1e885e9

Please sign in to comment.