Skip to content

Commit

Permalink
Additional user methods, refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
tjarrettveracode committed Oct 22, 2020
1 parent 4bdf401 commit 17bc1a7
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 52 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ The following methods call Veracode REST APIs and return JSON.
- `scantype`: Defaults to STATIC findings, but can be STATIC, DYNAMIC, MANUAL, SCA, or ALL (static, dynamic, manual).
- `annot`: Defaults to TRUE but can be FALSE
- `get_users()`: get a list of users for the organization.
- `get_user_self()`: get user information for the current user.
- `get_user(user_guid)`: get information for an individual user based on `user_guid`.
- `get_user_by_name(username)`: look up info for an individual user based on their user_name.
- `get_creds()`: get credentials information (API ID and expiration date) for the current user.
- `update_user(user_guid, roles)`: update the user identified by `user_guid` with the list of roles passed in `roles`. Because the Identity API does not support adding a single role, the list should be the entire list of existing roles for the user plus whatever new roles. See [veracode-user-bulk-role-assign](https://github.com/tjarrettveracode/veracode-user-bulk-role-assign) for an example.
- `disable_user(user_guid)`: set the `Active` flag the user identified by `user_guid` to `False`.
- `get_workspaces()`: get a list of SCA Agent workspaces for the organization.
- `get_workspace_by_name(name)`: get a list of SCA Agent workspaces whose name partially matches `name`.
- `create_workspace(name)`: create an SCA Agent workspace named `name`. Returns the GUID for the workspace.
Expand Down
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
setup(
name = 'veracode_api_py',
packages = ['veracode_api_py'],
version = '0.9',
version = '0.9.1',
license='MIT',
description = 'Python helper library for working with the Veracode APIs. Handles retries, pagination, and other features of the modern Veracode REST APIs.',
author = 'Tim Jarrett',
author_email = '[email protected]',
url = 'https://github.com/tjarrettveracode',
download_url = 'https://github.com/tjarrettveracode/veracode-api-py/archive/v_07.tar.gz',
download_url = 'https://github.com/tjarrettveracode/veracode-api-py/archive/v_091.tar.gz',
keywords = ['veracode', 'veracode-api'],
install_requires=[
'veracode-api-signing'
],
classifiers=[
'Development Status :: 3 - Alpha', # Chose either "3 - Alpha", "4 - Beta" or "5 - Production/Stable" as the current state of your package
'Development Status :: 4 - Beta', # Chose either "3 - Alpha", "4 - Beta" or "5 - Production/Stable" as the current state of your package
'Intended Audience :: Developers',
'Topic :: Software Development :: Build Tools',
'License :: OSI Approved :: MIT License',
Expand Down
107 changes: 58 additions & 49 deletions veracode_api_py/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
import time
import requests
import logging
import json
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry


from veracode_api_signing.exceptions import VeracodeAPISigningException
from veracode_api_signing.plugin_requests import RequestsAuthPluginVeracodeHMAC
Expand Down Expand Up @@ -64,63 +67,54 @@ def _request(self, url, method, params=None):
raise VeracodeAPIError(e)

def _rest_request(self, url, method, params=None,body=None,fullresponse=False):
# base request method for a REST request
myheaders = {"User-Agent": "api.py"}
if method in ["POST", "PUT"]:
myheaders.update({'Content-type': 'application/json'})
# base request method for a REST request
myheaders = {"User-Agent": "api.py"}
if method in ["POST", "PUT"]:
myheaders.update({'Content-type': 'application/json'})

retry_strategy = Retry(total=3,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["HEAD", "GET", "OPTIONS"]
)
session = requests.Session()
session.mount(self.base_rest_url, HTTPAdapter(max_retries=retry_strategy))

try:
if method == "GET":
# incorporate retries to deal with some failure situations
try:
session = requests.Session()
session.mount(self.base_rest_url, HTTPAdapter(max_retries=3))
request = requests.Request(method, self.base_rest_url + url, params=params, auth=RequestsAuthPluginVeracodeHMAC(), headers=myheaders)
prepared_request = request.prepare()
r = session.send(prepared_request, proxies=self.proxies)
if r.status_code == 500 or r.status_code == 504:
time.sleep(1)
r = requests.Request(method, url, params=params, auth=RequestsAuthPluginVeracodeHMAC(),headers=myheaders,json=body)
except requests.exceptions.RequestException as e:
logging.exception(self.connect_error_msg)
raise VeracodeAPIError(e)
request = requests.Request(method, self.base_rest_url + url, params=params, auth=RequestsAuthPluginVeracodeHMAC(), headers=myheaders)
prepared_request = request.prepare()
r = session.send(prepared_request, proxies=self.proxies)
elif method == "POST":
try:
r = requests.post(self.base_rest_url + url,params=params,auth=RequestsAuthPluginVeracodeHMAC(),headers=myheaders,data=body)
except requests.exceptions.RequestException as e:
logging.exception(self.connect_error_msg)
raise VeracodeAPIError(e)
r = requests.post(self.base_rest_url + url,params=params,auth=RequestsAuthPluginVeracodeHMAC(),headers=myheaders,data=body)
elif method == "PUT":
try:
r = requests.put(self.base_rest_url + url,params=params,auth=RequestsAuthPluginVeracodeHMAC(), headers=myheaders,data=body)
except requests.exceptions.RequestException as e:
logging.exception(self.connect_error_msg)
raise VeracodeAPIError(e)
r = requests.put(self.base_rest_url + url,params=params,auth=RequestsAuthPluginVeracodeHMAC(), headers=myheaders,data=body)
elif method == "DELETE":
try:
r = requests.delete(self.base_rest_url + url,params=params,auth=RequestsAuthPluginVeracodeHMAC(),headers=myheaders)
except requests.exceptions.RequestException as e:
logging.exception(self.connect_error_msg)
raise VeracodeAPIError(e)
r = requests.delete(self.base_rest_url + url,params=params,auth=RequestsAuthPluginVeracodeHMAC(),headers=myheaders)
else:
raise VeracodeAPIError("Unsupported HTTP method")
except requests.exceptions.RequestException as e:
logging.exception(self.connect_error_msg)
raise VeracodeAPIError(e)

if not (r.status_code == requests.codes.ok):
logging.debug("API call returned non-200 HTTP status code: {}".format(r.status_code))
if not (r.status_code == requests.codes.ok):
logging.debug("API call returned non-200 HTTP status code: {}".format(r.status_code))

if not (r.ok):
logging.debug("Error retrieving data. HTTP status code: {}".format(r.status_code))
if r.status_code == 401:
logging.exception("Check that your Veracode API account credentials are correct.")
else:
logging.exception("Error [{}]: {} for request {}".format(r.status_code,r.text, r.request.url))
raise requests.exceptions.RequestException()
elif fullresponse:
return r
if not (r.ok):
logging.debug("Error retrieving data. HTTP status code: {}".format(r.status_code))
if r.status_code == 401:
logging.exception("Check that your Veracode API account credentials are correct.")
else:
if r.text != "":
return r.json()
else:
return ""
logging.exception("Error [{}]: {} for request {}".
format(r.status_code, r.text, r.request.url))
raise requests.exceptions.RequestException()

if fullresponse:
return r

if r.text != "":
return r.json()
else:
return ""

def _rest_paged_request(self, url, method, element, params=None):
all_data = []
Expand All @@ -137,7 +131,7 @@ def _rest_paged_request(self, url, method, element, params=None):
page += 1
more_pages = page < total_pages
return all_data

#xml apis

def get_app_list(self):
Expand Down Expand Up @@ -233,18 +227,33 @@ def get_users(self):
request_params = {'page': 0} #initialize the page request
return self._rest_paged_request("api/authn/v2/users","GET","users",request_params)

def get_user_self (self):
#Gets the user info for the current user, using the Veracode Identity API
return self._rest_request("api/authn/v2/users/self","GET")

def get_user(self,user_guid):
#Gets an individual user provided their GUID, using the Veracode Identity API
uri = "api/authn/v2/users/{}".format(user_guid)
return self._rest_request(uri,"GET")

def get_user_by_name(self,username):
#Gets all the users who match the provided email address, using the Veracode Identity API
request_params = {'user_name': username} #initialize the page request
return self._rest_paged_request("api/authn/v2/users","GET","users",request_params)

def get_creds (self):
return self._rest_request("api/authn/v2/api_credentials","GET")

def update_user (self,user_guid,roles):
request_params = {'partial':'TRUE',"incremental": 'TRUE'}
uri = "api/authn/v2/users/{}".format(user_guid)
return self._rest_request(uri,"PUT",request_params,roles)
return self._rest_request(uri,"PUT",request_params,roles)

def disable_user (self,user_guid):
request_params = {'partial':'TRUE'}
uri = 'api/authn/v2/users/{}'.format(user_guid)
payload = json.dumps({'active': False})
return self._rest_request(uri,"PUT",request_params,payload)

## SCA APIs - note must be human user to use these, not API user

Expand Down

0 comments on commit 17bc1a7

Please sign in to comment.