# This workflow will upload a Python Package using Twine when a release is created
# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries
+# This workflow uses actions that are not certified by GitHub.
+# They are provided by a third-party and are governed by
+# separate terms of service, privacy policy, and support
+# documentation.
+name: Upload Python Package
+ release:
+ types: [published]
+ deploy:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up Python
+ uses: actions/setup-python@v2
+ with:
+ python-version: '3.x'
+ - name: Install dependencies
+ run: |
+ python -m pip install --upgrade pip
+ pip install build
+ - name: Build package
+ run: python -m build
+ - name: Publish package
+ uses: pypa/gh-action-pypi-publish@27b31702a0e7fc50959f5ad993c78deac1bdfc29
+ with:
+ user: __token__
+ password: ${{ secrets.PYPI_API_TOKEN }}
+name: Tests
+ - push
+ - pull_request
+ test:
+ runs-on: ${{ matrix.os}}
+ strategy:
+ matrix:
+ os: [windows-latest]
+ python-version: ['3.9']
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python@v2
+ with:
+ python-version: ${{ matrix.python-version }}
+ - name: Install dependencies
+ run: |
+ python -m pip install --upgrade pip
+ pip install tox tox-gh-actions
+ echo '${{ secrets.private }}' > private.pem
+ - name: Test with tox
+ run: >
+ tox -- -x --user ${{ secrets.user }}
+ --pw ${{ secrets.pw }}
+ --clientId ${{ secrets.client_id }}
+ --tokenUrl ${{ secrets.token_url }}
+ --apiUrl ${{ secrets.api_url }}
+ --apiUrlAnalytics ${{ secrets.api_url_analytics }}
+ --assertionType '${{ secrets.assertion_type }}'
+ --scope '${{ secrets.scope }}'
+ --profileIdType ${{ secrets.profile_id_type }}
\ No newline at end of file
# Burgiss API
## Description
-This package simplifies the connection to the Burgiss API and is built on top of the requests package
This package simplifies the connection to the Burgiss API and flattens API responses to dataframes.
## Authentication Setup
The class burgissApiAuth handles all the JWT token authentication but there are a few prerequesite requirements for the authentication.
@@ -21,7 +23,8 @@ pip install burgiss-api
## Usage
-Data can be updated via the api, to enable this you must change the scope in the config file and specify the request type.
+### Get requests
+Request method defaults to get
from burgissApi import burgissApiSession
@@ -38,9 +41,24 @@ lookUpValues = burgissSession.request('LookupValues', profileIdAsHeader=True)
# Optional Parameters
investments = burgissSession.request('investments', optionalParameters='&includeInvestmentNotes=false&includeCommitmentHistory=false&includeInvestmentLiquidationNotes=false')
+### Put requests
+Must add optional parameters for requestType and data
from burgissApi import burgissApiSession
+# Initiate a session and get profile id for subsequent calls (obtains auth token)
+burgissSession = burgissApiSession()
+# When creating a put request, all fields must be present
+data = {'someJsonObject':'data'}
+# Specify the request type
+orgs = burgissSession.request('some endpoint', requestType='PUT', data=data)
## Transformed Data Requests
-Some endpoints are supported for transformation to a flattened dataframe instead of a raw json
Receive a flattened dataframe instead of a raw json from api
from burgissApi import burgissApi
@@ -50,21 +68,6 @@ apiSession = burgissApi()
orgs = apiSession.getData('orgs')
-Supported Endpoints
-| -------|
-|orgs details|
-|investments transactions|
## Analytics API
from burgissApi import burgissApiSession
@@ -72,7 +75,16 @@ from burgissApi import burgissApiSession
# Initiate a session and get profile id for subsequent calls (obtains auth token)
burgissSession = burgissApiSession()
+# Get grouping fields
burgissSession.request('analyticsGroupingFields', analyticsApi=True, profileIdAsHeader=True)
+# Specify inputs for point in time analyis
+analysisJson = pointInTimeAnalyisInput(analysisParameters, globalMeasureParameters,
+ measures, measureStartDateReference, measureEndDateReference, dataCriteria, groupBy)
+# Send post request to receive data
+burgissSession.request('pointinTimeAnalysis', analyticsApi=True,
+ profileIdAsHeader=True, requestType='POST', data=analysisJson)
@@ -123,4 +135,4 @@ burgissSession.request('analyticsGroupingFields', analyticsApi=True, profileIdAs
- [Burgiss API Documentation](https://api.burgiss.com/v2/docs/index.html)
- [Burgiss Analytics API Documentation](https://api-analytics.burgiss.com/swagger/index.html)
- [Burgiss API Token Auth Documentation](https://burgiss.docsend.com/view/fcqygcx)
-- [Pypi Package](https://pypi.org/project/burgiss-api/)
\ No newline at end of file
- [Pypi Package](https://pypi.org/project/burgiss-api/)
@@ -1,5 +1,4 @@
import configparser
-import json
import logging
import uuid
from datetime import datetime, timedelta
@@ -11,14 +10,16 @@
from cryptography.hazmat.primitives import serialization
from OpenSSL import crypto
-# Create logging file for debugging
-for handler in logging.root.handlers[:]:
- logging.root.removeHandler(handler)
- logging.basicConfig(filename='burgissApi.log',
- encoding='utf-8', level=logging.DEBUG,
- format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
-logger = logging.getLogger('burgissApi')
-filehandler_dbg = logging.FileHandler(logger.name + '.log', mode='w')
+# Create and configure logger
+ format='%(asctime)s %(message)s',
+ filemode='w')
+# Creating an object
+logger = logging.getLogger()
+# Setting the threshold of logger to DEBUG
class ApiConnectionError(Exception):
@@ -30,42 +31,82 @@ def responseCodeHandling(response):
Handle request responses and log if there are errors
+ knownResponseCodes = {400: 'Unauthorized', 401: 'Forbidden', 404: 'Not Found', 500: 'Internal Server Error', 503: 'Service Unavailable'}
if response.status_code == 200:
return response
- elif response.status_code == 404:
+ elif response.status_code in knownResponseCodes.keys():
- "Url not found")
+ f"API Connection Failure: Error Code {response.status_code}, {knownResponseCodes[response.status_code]}")
raise ApiConnectionError(
- 'Url not found, check the logs for the specific url!')
+ f"Error Code {response.status_code}, {knownResponseCodes[response.status_code]}")
- logger.error(
- f"API Connection Failure: Error Code {response.status_code}")
raise ApiConnectionError(
'No recognized reponse from Burgiss API, Check BurgissApi.log for details')
-def lowerDictKeys(d):
+def tokenErrorHandling(tokenResponseJson: dict):
+ # Error Handling
+ if 'access_token' in tokenResponseJson.keys():
+ logger.info("Token request successful!")
+ return tokenResponseJson['access_token']
+ elif 'error' in tokenResponseJson.keys():
+ logging.error(
+ f"API Connection Error: {tokenResponseJson['error']}")
+ raise ApiConnectionError(
+ 'Check BurgissApi.log for details')
+ elif 'status_code' in tokenResponseJson.keys():
+ logging.error(
+ f"API Connection Error: Error Code {tokenResponseJson ['status_code']}")
+ raise ApiConnectionError(
+ 'Check BurgissApi.log for details')
+ else:
+ logging.error("Cannot connect to endpoint")
+ raise ApiConnectionError(
+ 'No recognized reponse from Burgiss API, Check BurgissApi.log for details')
+def lowerDictKeys(d: dict):
newDict = dict((k.lower(), v) for k, v in d.items())
return newDict
-class burgissApiAuth(object):
+class tokenAuth(object):
Create and send a signed client token to receive a bearer token from the burgiss api endpoint
- def __init__(self):
+ def __init__(self, clientId=None, username=None, password=None, urlToken=None, urlApi=None, analyticsUrlApi=None, assertionType=None, scope=None):
logger.info("Import client details from config file")
config = configparser.ConfigParser()
- config.read_file(open('config.cfg'))
- self.clientId = config.get('API', 'clientId')
- self.username = config.get('API', 'user')
- self.password = config.get('API', 'pw')
- self.urlToken = config.get('API', 'tokenUrl')
- self.urlApi = config.get('API', 'apiUrl')
- self.analyticsUrlApi = config.get('API', 'apiUrlAnalytics')
- self.assertionType = config.get('API', 'assertionType')
- self.scope = config.get('API', 'scope')
+ try:
+ config.read_file(open('config.cfg'))
+ self.clientId = config.get('API', 'clientId')
+ self.username = config.get('API', 'user')
+ self.password = config.get('API', 'pw')
+ self.urlToken = config.get('API', 'tokenUrl')
+ self.urlApi = config.get('API', 'apiUrl')
+ self.analyticsUrlApi = config.get('API', 'apiUrlAnalytics')
+ self.assertionType = config.get('API', 'assertionType')
+ self.scope = config.get('API', 'scope')
+ except Exception as e:
+ logging.error(e)
+ print('Config file not found, is it located in your cwd?')
+ if clientId is not None:
+ self.clientId = clientId
+ if username is not None:
+ self.username = username
+ if password is not None:
+ self.password = password
+ if urlToken is not None:
+ self.urlToken = urlToken
+ if urlApi is not None:
+ self.urlApi = urlApi
+ if analyticsUrlApi is not None:
+ self.analyticsUrlApi = analyticsUrlApi
+ if assertionType is not None:
+ self.assertionType = assertionType
+ if scope is not None:
+ self.scope = scope
logger.info("Client details import complete!")
def getBurgissApiToken(self):
@@ -86,7 +127,7 @@ def getBurgissApiToken(self):
headers = {
'alg': 'RS256',
- 'kid': crypto.X509().digest('sha1').decode('utf-8').replace(':', ''),
+ 'kid': crypto.X509().digest('sha1').decode('utf-8').replace(':', ''), # type: ignore
'typ': 'JWT'
payload = {
@@ -99,9 +140,13 @@ def getBurgissApiToken(self):
'aud': self.urlToken
- logger.info("Encode client assertion with jwt")
- clientToken = jwt.encode(
- payload, secret_key, headers=headers, algorithm='RS256')
+ logger.info("Encoding client assertion with jwt")
+ try:
+ clientToken = jwt.encode(
+ payload, secret_key, headers=headers, algorithm='RS256') # type: ignore
+ logger.info("Encoding complete!")
+ except Exception as e:
+ logging.error(e)
payload = {
'grant_type': 'password',
@@ -117,41 +162,34 @@ def getBurgissApiToken(self):
tokenResponse = requests.request(
'POST', self.urlToken, data=payload
- tokenResponseJson = tokenResponse.json()
- # Error Handling
- if 'access_token' in tokenResponseJson.keys():
- logger.info("Token request successful!")
- return tokenResponseJson['access_token']
- elif 'error' in tokenResponseJson.keys():
- logging.error(
- f"API Connection Error: {tokenResponseJson['error']}")
- raise ApiConnectionError(
- 'Check BurgissApi.log for details')
- elif 'status_code' in tokenResponseJson.keys():
- logging.error(
- f"API Connection Error: Error Code {tokenResponseJson ['status_code']}")
- raise ApiConnectionError(
- 'Check BurgissApi.log for details')
- else:
- logging.error("Cannot connect to endpoint")
- raise ApiConnectionError(
- 'No recognized reponse from Burgiss API, Check BurgissApi.log for details')
+ return tokenErrorHandling(tokenResponse.json())
-class burgissApiInit(burgissApiAuth):
+class init(tokenAuth):
- Initializes a session for all subsequent calls using the burgissApiAuth class
+ Initializes a session for all subsequent calls using the tokenAuth class
- def __init__(self):
- self.auth = burgissApiAuth()
+ def __init__(self, clientId=None, username=None, password=None, urlToken=None, urlApi=None, analyticsUrlApi=None, assertionType=None, scope=None):
+ self.auth = tokenAuth(clientId, username, password, urlToken, urlApi, analyticsUrlApi, assertionType, scope)
self.token = self.auth.getBurgissApiToken()
self.tokenExpiration = datetime.utcnow() + timedelta(seconds=3600)
self.urlApi = self.auth.urlApi
self.analyticsUrlApi = self.auth.analyticsUrlApi
- def request(self, url: str, analyticsApi: bool = False, requestType: str = 'GET', profileIdHeader: bool = False, data=''):
+ def checkTokenExpiration(self):
+ """
+ Check if token is expired, if it is get a new token
+ """
+ logger.info('Check if token has expired')
+ if self.tokenExpiration < datetime.utcnow():
+ logger.info('Token has expired, getting new token')
+ self.token = self.auth.getBurgissApiToken()
+ self.tokenExpiration = datetime.utcnow() + timedelta(seconds=3600)
+ else:
+ logger.info('Token is still valid')
+ def requestWrapper(self, url: str, analyticsApi: bool = False, requestType: str = 'GET', profileIdHeader: bool = False, data=''):
Burgiss api request call, handling bearer token auth in the header with token received when class initializes
@@ -163,18 +201,10 @@ def request(self, url: str, analyticsApi: bool = False, requestType: str = 'GET'
Response [json]: Data from url input
- # Check if token is expired, if it is get a new token
- logger.info('Check if token has expired')
- if self.tokenExpiration < datetime.utcnow():
- logger.info('Token has expired, getting new token')
- self.token = self.auth.getBurgissApiToken()
- self.tokenExpiration = datetime.utcnow() + timedelta(seconds=3600)
- else:
- logger.info('Token is still valid')
+ self.checkTokenExpiration()
# Default to regular api but allow for analytics url
- if analyticsApi == False:
+ if analyticsApi is False:
baseUrl = self.urlApi
baseUrl = self.analyticsUrlApi
@@ -199,26 +229,31 @@ def request(self, url: str, analyticsApi: bool = False, requestType: str = 'GET'
return responseCodeHandling(response)
-class burgissApiSession(burgissApiInit):
+class session(init):
Simplifies request calls by getting auth token and profile id from parent classes
- def __init__(self):
+ def __init__(self, clientId=None, username=None, password=None, urlToken=None, urlApi=None, analyticsUrlApi=None, assertionType=None, scope=None, profileIdType=None):
Initializes a request session, authorizing with the api and gets the profile ID associated with the logged in account
config = configparser.ConfigParser()
- config.read_file(open('config.cfg'))
- self.profileIdType = config.get('API', 'profileIdType')
- self.session = burgissApiInit()
- self.profileResponse = self.session.request(
+ try:
+ config.read_file(open('config.cfg'))
+ self.profileIdType = config.get('API', 'profileIdType')
+ except Exception as e:
+ logging.debug(e)
+ if profileIdType is not None:
+ self.profileIdType = profileIdType
+ self.session = init(clientId, username, password, urlToken, urlApi, analyticsUrlApi, assertionType, scope)
+ self.profileResponse = self.session.requestWrapper(
self.profileId = self.profileResponse[0][self.profileIdType]
def request(self, url: str, analyticsApi: bool = False, profileIdAsHeader: bool = False, optionalParameters: str = '', requestType: str = 'GET', data=''):
- Basic request, built on top of burgissApiInit.request, which handles urls and token auth
+ Basic request, built on top of init.requestWrapper, which handles urls and token auth
url (str): Each burgiss endpoint has different key words e.g. 'investments' -> Gets list of investments
@@ -235,7 +270,7 @@ def request(self, url: str, analyticsApi: bool = False, profileIdAsHeader: bool
response [object]: Request object, refer to the requests package documenation for details
- if profileIdAsHeader == False:
+ if profileIdAsHeader is False:
profileUrl = f'?profileID={self.profileId}'
profileIdHeader = False
@@ -244,18 +279,18 @@ def request(self, url: str, analyticsApi: bool = False, profileIdAsHeader: bool
endpoint = url + profileUrl + optionalParameters
- response = self.session.request(
+ response = self.session.requestWrapper(
endpoint, analyticsApi, requestType, profileIdHeader, data)
return responseCodeHandling(response)
-class burgissApi():
- def __init__(self):
+class transformResponse(session):
+ def __init__(self, clientId=None, username=None, password=None, urlToken=None, urlApi=None, analyticsUrlApi=None, assertionType=None, scope=None, profileIdType=None):
Initializes a request session, authorizing with the api and gets the profile ID associated with the logged in account
- self.apiSession = burgissApiSession()
+ self.apiSession = session(clientId, username, password, urlToken, urlApi, analyticsUrlApi, assertionType, scope, profileIdType)
# storing exceptions here for now until we can determine a better way to handle them
self.nestedExceptions = {'LookupData':
{'method': 'json_normalize',
@@ -266,7 +301,7 @@ def __init__(self):
{'method': 'nestedJson'}
- def parseNestedJson(self, responseJson):
+ def parseNestedJson(self, responseJson: dict):
Custom nested json parser
@@ -284,7 +319,7 @@ def parseNestedJson(self, responseJson):
return dfTransformed
- def flattenResponse(self, resp, field):
+ def flattenResponse(self, resp, field: str):
The api sends a variety of responses, this function determines which parsing method to use based on the response
@@ -308,7 +343,7 @@ def flattenResponse(self, resp, field):
flatDf = pd.json_normalize(respLower)
return flatDf
- def columnNameClean(self, df):
+ def columnNameClean(self, df: pd.DataFrame):
Removes column name prefix from unnested columns
@@ -341,9 +376,9 @@ def getData(self, field: str, profileIdAsHeader: bool = False, OptionalParameter
field, profileIdAsHeader=profileIdAsHeader, optionalParameters=OptionalParameters).json()
# Flatten and clean response
- flatDf = self.flattenResponse(resp, field)
+ flatDf = self.flattenResponse(resp, field)
cleanFlatDf = self.columnNameClean(flatDf)
return cleanFlatDf
def getTransactions(self, id: int, field: str):
@@ -352,7 +387,8 @@ def getTransactions(self, id: int, field: str):
id (int): refers to investmentID
- field (str): 'transaction' model has different key words (e.g. 'valuation', 'cash', 'stock', 'fee', 'funding') -> Gets list of values for indicated investmentID
+ field (str): 'transaction' model has different key words (e.g. 'valuation', 'cash', 'stock', 'fee', 'funding')
+ -> Gets list of values for indicated investmentID
json [object]: dictionary of specified field's values for investmentID
@@ -382,4 +418,10 @@ def pointInTimeAnalyisInput(analysisParameters, globalMeasureParameters, measure
pointInTimeAnalyis['dataCriteria'] = [dataCriteria]
pointInTimeAnalyis['groupBy'] = groupBy
- return json.dumps(pointInTimeAnalyis)
+ print(pointInTimeAnalyis)
+ # Remove any none or null values
+ # pointInTimeAnalyisProcessed = {x:y for x,y in pointInTimeAnalyis.items() if (y is not None and y!='null') }
+ print(pointInTimeAnalyis)
+ return pointInTimeAnalyis
+ # return json.dumps(pointInTimeAnalyis)
