Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev jobexecution #106

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from
7 changes: 7 additions & 0 deletions doc/api/services/compute.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
sasctl.services.compute
=======================

.. automodule:: sasctl._services.compute
:members:
:undoc-members:
:show-inheritance:
7 changes: 7 additions & 0 deletions doc/api/services/job_definitions.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
sasctl.services.job_definitions
===============================

.. automodule:: sasctl._services.job_definitions
:members:
:undoc-members:
:show-inheritance:
7 changes: 7 additions & 0 deletions doc/api/services/job_execution.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
sasctl.services.job_execution
=============================

.. automodule:: sasctl._services.job_execution
:members:
:undoc-members:
:show-inheritance:
248 changes: 248 additions & 0 deletions src/sasctl/_services/compute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
#!/usr/bin/env python
# encoding: utf-8
#
# Copyright © 2019, SAS Institute Inc., Cary, NC, USA. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from .service import Service


class Compute(Service):
"""The Compute service affords CRUD operations for Compute Contexts.

A Compute context is analogous to the SAS Application Server from SAS V9.
"""

_SERVICE_ROOT = '/compute'

list_contexts, get_context, update_context, delete_context = Service._crud_funcs(
'/contexts'
)

list_servers, get_server, _, stop_server = Service._crud_funcs('/servers')

_, _, _, delete_session = Service._crud_funcs('/sessions')

@classmethod
def list_sessions(cls, server=None):
"""List currently active sessions.

Parameters
----------
server : str or dict, optional
The name or id of the server, or a dictionary representation of the server. If specified, only sessions
for that server will be returned. Otherwise, sessions for all running servers are returned.

Returns
-------
list of RestObj

Raises
------
ValueError
If `server` is specified but not found.

"""
if server is not None:
server_obj = cls.get_server(server)
if server_obj is None:
raise ValueError("Unable to find server '%s'." % server)
uri = '/servers/%s/sessions' % server_obj['id']

Check warning on line 50 in src/sasctl/_services/compute.py

View check run for this annotation

Codecov / codecov/patch

src/sasctl/_services/compute.py#L46-L50

Added lines #L46 - L50 were not covered by tests
else:
uri = '/sessions'

Check warning on line 52 in src/sasctl/_services/compute.py

View check run for this annotation

Codecov / codecov/patch

src/sasctl/_services/compute.py#L52

Added line #L52 was not covered by tests

results = cls.get(uri)
if isinstance(results, list):
return results

Check warning on line 56 in src/sasctl/_services/compute.py

View check run for this annotation

Codecov / codecov/patch

src/sasctl/_services/compute.py#L54-L56

Added lines #L54 - L56 were not covered by tests

return [results]

Check warning on line 58 in src/sasctl/_services/compute.py

View check run for this annotation

Codecov / codecov/patch

src/sasctl/_services/compute.py#L58

Added line #L58 was not covered by tests

@classmethod
def get_server_state(cls, server):
"""

Parameters
----------
server : str or dict
The name or id of the server, or a dictionary representation of the server.


Returns
-------
str
{'running', 'stopped'}

"""
server_obj = cls.get_server(server)
uri = '/servers/%s/state' % server_obj['id']

Check warning on line 77 in src/sasctl/_services/compute.py

View check run for this annotation

Codecov / codecov/patch

src/sasctl/_services/compute.py#L76-L77

Added lines #L76 - L77 were not covered by tests

return cls.get(uri)

Check warning on line 79 in src/sasctl/_services/compute.py

View check run for this annotation

Codecov / codecov/patch

src/sasctl/_services/compute.py#L79

Added line #L79 was not covered by tests

@classmethod
def create_job(
cls,
session,
code,
name=None,
description=None,
environment=None,
variables=None,
resources=None,
attributes=None,
):
"""

Parameters
----------
session : str or dict
code : str
name
description
environment
variables
resources
attributes

Returns
-------

"""

# TODO: if code is URI pass in codeUri field.
code = code.split('\n')

uri = '/sessions/%s/jobs' % session['id']

# NOTE: variables must be None not []
resources = resources or []
environment = environment or {}
attributes = attributes or {}

data = {
'version': 3,
'name': name,
'description': description,
'environment': environment,
'variables': variables,
'code': code,
'resources': resources,
'attributes': attributes,
}

return cls.post(uri, json=data)
# return cls.request_link(session, 'execute', json=data)

@classmethod
def create_session(cls, context):
"""Create a new session based on an existing Compute Context.

If a reusable SAS Compute Server is available to handle this session, a new session is created on that SAS
Compute Server. Otherwise, a new SAS Compute Server is created and the new session is created there.

Parameters
----------
context : RestObj
An existing Compute Context as returned by `get_context`.

Returns
-------
RestObj
Session details

"""
context = cls.get_context(context)

return cls.request_link(context, 'createSession')

@classmethod
def get_listing(cls, job=None, session=None):
"""Retrieve the SAS listing (program output) for a job or an entire session.

Parameters
----------
job : dict, optional
Dictionary representation of an existing job.
session : dict, optional
Dictionary representation of an active session.

Returns
-------
list of RestObj

Raises
------
ValueError
If neither `job` nor `session` are provided.

"""
if job is None and session is None:
raise ValueError("Either a job or a session must be specified.")

if job:
uri = '/sessions/%s/jobs/%s/listing' % (job['sessionId'], job['id'])

Check warning on line 182 in src/sasctl/_services/compute.py

View check run for this annotation

Codecov / codecov/patch

src/sasctl/_services/compute.py#L181-L182

Added lines #L181 - L182 were not covered by tests
else:
uri = '/sessions/%s/listing' % session['id']

Check warning on line 184 in src/sasctl/_services/compute.py

View check run for this annotation

Codecov / codecov/patch

src/sasctl/_services/compute.py#L184

Added line #L184 was not covered by tests

return cls.get(uri)

Check warning on line 186 in src/sasctl/_services/compute.py

View check run for this annotation

Codecov / codecov/patch

src/sasctl/_services/compute.py#L186

Added line #L186 was not covered by tests

@classmethod
def get_log(cls, job=None, session=None):
"""Retrieve the SAS log for a job or an entire session.

Parameters
----------
job : dict, optional
Dictionary representation of an existing job.
session : dict, optional
Dictionary representation of an active session.

Returns
-------
list of RestObj

Raises
------
ValueError
If neither `job` nor `session` are provided.

"""
if job is None and session is None:
raise ValueError("Either a job or a session must be specified.")

if job:
uri = '/sessions/%s/jobs/%s/log' % (job['sessionId'], job['id'])

Check warning on line 213 in src/sasctl/_services/compute.py

View check run for this annotation

Codecov / codecov/patch

src/sasctl/_services/compute.py#L212-L213

Added lines #L212 - L213 were not covered by tests
else:
uri = '/sessions/%s/log' % session['id']

Check warning on line 215 in src/sasctl/_services/compute.py

View check run for this annotation

Codecov / codecov/patch

src/sasctl/_services/compute.py#L215

Added line #L215 was not covered by tests

return cls.get(uri)

Check warning on line 217 in src/sasctl/_services/compute.py

View check run for this annotation

Codecov / codecov/patch

src/sasctl/_services/compute.py#L217

Added line #L217 was not covered by tests

@classmethod
def get_results(cls, job=None, session=None):
"""Retrieve the output files for a job or an entire session.

Parameters
----------
job : dict, optional
Dictionary representation of an existing job.
session : dict, optional
Dictionary representation of an active session.

Returns
-------
list of RestObj

Raises
------
ValueError
If neither `job` nor `session` are provided.

"""
if job is None and session is None:
raise ValueError("Either a job or a session must be specified.")

if job:
uri = '/sessions/%s/jobs/%s/results' % (job['sessionId'], job['id'])

Check warning on line 244 in src/sasctl/_services/compute.py

View check run for this annotation

Codecov / codecov/patch

src/sasctl/_services/compute.py#L243-L244

Added lines #L243 - L244 were not covered by tests
else:
uri = '/sessions/%s/results' % session['id']

Check warning on line 246 in src/sasctl/_services/compute.py

View check run for this annotation

Codecov / codecov/patch

src/sasctl/_services/compute.py#L246

Added line #L246 was not covered by tests

return cls.get(uri)

Check warning on line 248 in src/sasctl/_services/compute.py

View check run for this annotation

Codecov / codecov/patch

src/sasctl/_services/compute.py#L248

Added line #L248 was not covered by tests
122 changes: 122 additions & 0 deletions src/sasctl/_services/job_definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/usr/bin/env python
# encoding: utf-8
#
# Copyright © 2019, SAS Institute Inc., Cary, NC, USA. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from .service import Service
from ..core import RestObj


class JobDefinitions(Service):
"""
Implements the Job Definitions REST API.

The Job Definitions API manages jobs using the Create, Read, Update, and Delete operations. A Job Definition
is a batch execution that contains a list of input parameters, a job type, and a "code" attribute. A job
definition can run in multiple execution environments, based on the type of the job.

See Also
--------
`REST Documentation <https://developer.sas.com/apis/rest/Compute/#job-definitions>`_
"""

_SERVICE_ROOT = '/jobDefinitions'

(
list_definitions,
get_definition,
update_definition,
delete_definition,
) = Service._crud_funcs('/definitions')

@classmethod
def create_definition(
cls,
name: str = None,
description: str = None,
type_: str = None,
code: str = None,
parameters=None,
properties: dict = None,
) -> RestObj:
"""Define a new job that can be run in the SAS environment

Parameters
----------
name : str
Job name
description : str
Job description
type_ : {'casl', 'Compute'}
Indicates type of code specified by `code`. Use 'casl' if `code` is CASL or 'Compute' if `code` is
data step.
code : str
Code to be executed whenever this job is run.
parameters : list of dict
List of parameters used by the job. Each entry in the list should be a dictionary with the following items:
- name : str
parameter name
- type : {'character', 'date', 'numeric', 'table'}
parameter type
- label : str
human-readable label for parameter
- required : bool
is it required to specify a parameter value when running the job
- defaultValue : any
parameter's default value if not specified when running job
properties : dict
An arbitrary collection of name/value pairs to associate with the job definition.

Returns
-------
RestObj

"""

# Convert name:value pairs of properties to separate "name" & "value" fields.
properties = properties or {}
properties = [{'name': k, 'value': v} for k, v in properties.items()]

parameters = parameters or []

clean_parameters = []
for param in parameters:
param_type = str(param.get('type', '')).upper()
if param_type not in ('TABLE', 'NUMERIC', 'DATE', 'CHARACTER'):
raise ValueError(
"Type '{}' for parameter '{}' is invalid. "
"Expected one of ('TABLE', 'NUMERIC', 'DATE', 'CHARACTER')".format(
param_type, param['name']
)
)

new_param = {
'version': 1,
'name': str(param.get('name', ''))[
:100
], # Max length of 100 characters
'type': param_type,
'label': str(param.get('label', ''))[
:250
], # Max length of 250 characters
'required': bool(param.get('required')),
'defaultValue': param.get('defaultValue'),
}
clean_parameters.append(new_param)

definition = {
'version': 2,
'name': name,
'description': description,
'type': type_,
'code': code,
'parameters': clean_parameters,
'properties': properties,
}

return cls.post(
'/definitions',
json=definition,
headers={'Accept': 'application/vnd.sas.job.definition+json'},
)
Loading