diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 5b9ae28ecc..c11d53fe88 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -22,7 +22,7 @@ jobs: - uses: actions/checkout@v2 - name: Install dependencies run: | - apt-get update && apt-get install -y python3-pip libhdf5-dev hdf5-tools libzmq3-dev + apt-get update && apt-get install -y python3-pip libhdf5-dev hdf5-tools libzmq3-dev libcurl4-openssl-dev python3 -m pip install types-pkg_resources versioneer python3 -m pip install -e .[dev] - name: Arkouda mypy @@ -37,7 +37,7 @@ jobs: - uses: actions/checkout@v2 - name: Install dependencies run: | - apt-get update && apt-get install -y python3-pip libhdf5-dev hdf5-tools libzmq3-dev + apt-get update && apt-get install -y python3-pip libhdf5-dev hdf5-tools libzmq3-dev libcurl4-openssl-dev python3 -m pip install versioneer python3 -m pip install -e .[dev] - name: Arkouda make doc @@ -52,7 +52,7 @@ jobs: - uses: actions/checkout@v2 - name: Install Dependencies run: | - apt-get update && apt-get install -y python3-pip libhdf5-dev hdf5-tools libzmq3-dev + apt-get update && apt-get install -y python3-pip libhdf5-dev hdf5-tools libzmq3-dev libcurl4-openssl-dev python3 -m pip install types-pkg_resources versioneer python3 -m pip install -e .[dev] - name: Arkouda flake8 @@ -77,7 +77,7 @@ jobs: apt-get update && apt-get install -y -V ca-certificates lsb-release wget wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb apt-get install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - apt-get update && apt-get install -y libhdf5-dev hdf5-tools libzmq3-dev python3-pip libarrow-dev libparquet-dev + apt-get update && apt-get install -y libhdf5-dev hdf5-tools libzmq3-dev python3-pip libarrow-dev libparquet-dev libcurl4-openssl-dev echo "\$(eval \$(call add-path,/usr/lib/x86_64-linux-gnu/hdf5/serial/))" > Makefile.paths - name: Check python version run: | @@ -104,7 +104,7 @@ jobs: apt-get update && apt-get install -y -V ca-certificates lsb-release wget wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb apt-get install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - apt-get update && apt-get install -y libhdf5-dev hdf5-tools libzmq3-dev python3-pip libarrow-dev libparquet-dev + apt-get update && apt-get install -y libhdf5-dev hdf5-tools libzmq3-dev python3-pip libarrow-dev libparquet-dev libcurl4-openssl-dev echo "\$(eval \$(call add-path,/usr/lib/x86_64-linux-gnu/hdf5/serial/))" > Makefile.paths - name: Check chpl version run: | @@ -137,7 +137,7 @@ jobs: apt-get update && apt-get install -y -V ca-certificates lsb-release wget wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb apt-get install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - apt-get update && apt-get install -y libhdf5-dev hdf5-tools libzmq3-dev python3-pip libarrow-dev libparquet-dev + apt-get update && apt-get install -y libhdf5-dev hdf5-tools libzmq3-dev python3-pip libarrow-dev libparquet-dev libcurl4-openssl-dev echo "\$(eval \$(call add-path,/usr/lib/x86_64-linux-gnu/hdf5/serial/))" > Makefile.paths - name: Build/Install Arkouda run: | diff --git a/EXTERNAL_INTEGRATION.md b/EXTERNAL_INTEGRATION.md new file mode 100644 index 0000000000..dda9f41313 --- /dev/null +++ b/EXTERNAL_INTEGRATION.md @@ -0,0 +1,188 @@ +# External Integration + +## Overview + +Given the crucial Exploratory Data Analysis (EDA) role Arkouda fulfills in a variety of data science workflows, coupled with the popular trend of deploying machine learning (ML) and deep learning (DL) workloads to cloud environments, enabling Arkouda to be seamlessly integrated into cloud environments such as Kubernetes is an increasingly important use case. + +## Design + +Delivering integration with external systems such as Kubernetes requires four elements, all of which are encapsulated within the [ExternalIntegration](src/ExternalIntegration.chpl) module with the exception of one enum: + +1. Channel--implements logic for writing to external systems via export channels such as file systems and HTTP/S servers. +2. ChannelParams--encapsulates configuration parameters needed by Channel objects to connect to external systems such as file systems and HTTP/S servers. +3. register/deregister--various register and deregister functions that register/deregister Arkouda with external systems via the corresponding Channel. +4. Enums--there are several enum classes that provide controlled vocabulary for external system and channel parameters. + +### Channel + +Channel derived classes override the Channel.write function to write the string payload parameter to an external system. For example, the HttpChannel class leverages the Chapel Curl module to write the JSON-formatted payloads used to register and deregister Arkouda with Kubernetes. + +### ChannelParams + +The ChannelParams derived classes encapsulate the metadata required to connect and write to external systems via a Channel. + +### register and deregister Functions + +The ExternalIntegration register and deregister functions encapsulate logic to (1) generate the payload required to register and deregister Arkouda with/from external systems and (2) utilize a Channel object to deliver the register/deregister payload. + +### Enums + +The following enums provide controlled vocabulary to configure external integration: + +1. SystemType--indicates the external system type, examples of which are KUBERNETES, CONSUL, and REDIS. +2. ChannelType--defines the type of channel used to integrate with an external systems, examples of which are FILE and HTTP. +3. ServiceEndpoint--indicates if the socket is for Arkouda client requests (for Arkouda server commands) or for metrics requests. +4. HttpRequestType, HttpRequestFormat--enums used internally within the ExternalIntegration module to configure the HttpChannel in terms of request type (e.g., POST or PUT) and request format (e.g., TEXT or JSON). +5. Deployment--defined in the [ServerConfig](ServerConfig.chpl) module, the Deployment enum indicates whether Arkouda is deployed in a STANDARD environment (Slurm, bare metal) or KUBERNETES. + +## Building Arkouda with External Integration Support + +Since the ExternalIntegration module delegates HttpChannel registration logic to the Chaple Curl module, building Arkouda with ExternalIntegration requires the libcurl4-openssl-dev lib to be installed. For Debian and Ubuntu Linux distros, the install command is as follows: + +``` +sudo apt-get install libcurl4-openssl-dev +``` + +## Preparing External Systems for Integration + +### Kubernetes + +The initial use case for Arkouda external integration is Kubernetes as described below. + +#### Required Files for Registering with Kubernetes + +The Chapel Curl logic must use HTTPS to register/deregister with Kubernetes via the Kubernetes Rest API. Accordingly, SSL .crt and .key files signed with the certificate authority (CA) file configured for the target Kubernetes cluster must be deployed to all bare-metal/Slurm nodes or as a secret for Arkouda-on-Kubernetes deployments. + +An example of generating the required files is as follows: + +``` +# Generate base key file +openssl genrsa -out arkouda.key 2048 + +# Generate the certificate signing request (CSR) +openssl req -new -key arkouda.key -out arkouda.csr + +# Sign with Kubernetes-configured CA +sudo openssl x509 -req -in arkouda.csr -CA /etc/kubernetes/ssl/kube-ca.pem -CAkey /etc/kubernetes/ssl/kube-ca-key.pem -CAcreateserial -out arkouda.crt -days 730 +``` + +#### Creating the Kubernetes User + +With the private key and signed cert file, create the arkouda user as follows: + +``` +kubectl config set-credentials arkouda --client-certificate=arkouda.crt --client-key=arkouda.key +``` + +#### Authorize read/write Access to Kubernetes Client API + +With the Kubernetes arkouda user and corresponding credentials composed of the arkouda.key and arkouda.crt in place, create the ClusterRoleBinding needed to authorize the arkouda user read/write access to the Kubernetes Client API. + +``` +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: arkouda-rbac +subjects: +- kind: User + name: arkouda + apiGroup: rbac.authorization.k8s.io +roleRef: + kind: ClusterRole #this must be Role or ClusterRole + name: cluster-admin # must match the name of the Role + apiGroup: rbac.authorization.k8s.io +``` + +``` +kubectl apply -f arkouda-rbac.yaml +``` + +Important note: while this cluster role binding is valid, there may be some environments where it is desirable to narrow the arkouda user privileges. + +## Use Cases and Examples + +### Kubernetes + +A stated above, integrating Arkouda with ML and DL workflows on Kubernetes is an increasingly important use case given the popularity of deploying ML/DL workflows to cloud environments generally, and Kubernetes specifically. + +The registerWithKubernetes function generates the JSON blob containing either the standard (if Arkouda is deployed on Kubernetes) or external (Arkouda is deployed outside of Kubernetes on Slurm or bare-metal) service definition. + +#### Registering Arkouda with Kubernetes + +There are two inner functions that registerWithKubernetes delegates to: + +1. registerAsInternalService--registers Arkouda as a standard Kubernetes service for Arkouda-on-Kubernetes deployments +2. registerAsExternalService--registers Arkouda-on-Slurm or bare metal--in other words, Arkouda deployed outside of Kubernetes--as an external Kubernetes service + +The result of both registration inner functions is making Arkouda accessible to applications such as ML and DL workflows deployed in Kubernetes. + +#### Deregistering Arkouda from Kubernetes + +The deregisterFromKubernetes function deletes the Kubernetes service and is triggered by the ak.shutdown() Arkouda client request. + +#### Kubernetes Integration Configuration Parameters + +The following environmental variables are required to configure Arkouda to register/deregister with Kubernetes: + +1. K8S_HOST--the Kubernetes API connect string +2. NAMESPACE--Kubernetes namespace the service is deployed to +3. KEY_FILE--TLS key file corresponding to a Kubernetes user that has service create/read/delete privileges +4. CERT_FILE--TLS cert file corresponding to a Kubernetes user that has service create/read/delete privileges +5. EXTERNAL_SERVICE_PORT--port Arkouda will be accessible from +6. EXTERNAL_SERVICE_NAME--service name to access Arkouda + +#### Kubernetes Internal Service Registration (Kubernetes) + +Deployment of Arkouda-on-Kubernetes involves two Helm charts: one for the driver (locale0) pod, and one for 1..n locale (locale1...#locales-1) pods. The Helm installation process is detailed [here](https://github.com/hokiegeek2/arkouda/wiki/Arkouda-on-Docker-and-Kubernetes#deploying-multi-locale-arkouda-on-kubernetes). + +Note that the ExternalIntegration.externalSystem param is SystemType.KUBERNETES and the ServerConfig deployment param is Deployment.KUBERNETES (Arkouda is deployed on Kubernetes) + +#### Kubernetes External Service Registration (Slurm) + +An example Slurm BATCH file for an Arkouda instance that registers/deregisters with Kubernetes is shown below. Note that the ExternalIntegration.externalSystem param is SystemType.KUBERNETES and the deployment param is not specified because Slurm is considered a DEFAULT deployment type. + +``` +#!/bin/bash +# +#SBATCH --job-name=arkouda-3-node +#SBATCH --output=/tmp/arkouda.out +#SBATCH --mem=1024 +#SBATCH --ntasks=3 +#SBATCH --nodes=3 + +export CHPL_COMM_SUBSTRATE=udp +export GASNET_MASTERIP='server1' +export SSH_SERVERS='server1 server2 server3' +export GASNET_SPAWNFN=S + +export NAMESPACE=arkouda +export K8S_HOST=https://ace:6443 #result from kubectl cluster-info command +export EXTERNAL_SERVICE_NAME=arkouda-external +export EXTERNAL_SERVICE_PORT=5555 +export KEY_FILE=/opt/arkouda/tls.key #on all slurm hosts +export CERT_FILE=/opt/arkouda/tls.crt #on all slurm hosts +export CACERT_FILE=/etc/kubernetes/ssl/kube-ca.pem #on slurm hosts + +./arkouda_server -nl 3 --ExternalIntegration.systemType=SystemType.KUBERNETES \ + --ServerDaemon.daemonTypes=ServerDaemonType.INTEGRATION +``` + +#### Kubernetes External Service Registration (Bare Metal) + +An example bare metal deployment script for an Arkouda instance that registers/deregisters with Kubernetes is shown below. As is the case with the Arkouda-on-Slurm deployment, the ExternalIntegration.externalSystem param is SystemType.KUBERNETES and the deployment param is not specified because bare metal is considered a DEFAULT deployment type. + +``` +#!/bin/bash + +export GASNET_MASTERIP='server1' +export SSH_SERVERS='server1 server2 server3' +export NAMESPACE=arkouda +export EXTERNAL_SERVICE_NAME=arkouda-external +export EXTERNAL_SERVICE_PORT=5555 +export K8S_HOST=https://ace:6443 #result from kubectl cluster-info command +export KEY_FILE=/opt/arkouda.key #on all bare metal hosts +export CERT_FILE=/opt/arkouda.crt #on all bare metal hosts + +./arkouda_server -nl 3 --ExternalIntegration.systemType=SystemType.KUBERNETES \ + --ServerDaemon.daemonTypes=ServerDaemonType.INTEGRATION +``` \ No newline at end of file diff --git a/README.md b/README.md index ceca0ee31a..63fcfbffdd 100644 --- a/README.md +++ b/README.md @@ -101,8 +101,8 @@ This yielded a >20TB dataframe in Arkouda. 6. [Type Checking in Arkouda](#typecheck-ak) 7. [Environment Variables](#env-vars-ak) 8. [Versioning](#versioning-ak) -9. [Contributing](#contrib-ak) - +9. [External Systems Integration](#external-integration) +10. [Contributing](#contrib-ak) ## Prerequisites toc @@ -363,6 +363,10 @@ python -m arkouda |tail -n 2 git push --tags ``` + +## External Systems Integration toc + +Integrating Arkouda with cloud environments enables users to access Arkouda from machine learning (ML) and deep learning (DL) workflows deployed to Kubernetes as an example. Detailed discussions regarding Arkouda systems integration and specific instructions for registering/deregistering Arkouda with Kubernetes are located in [EXTERNAL INTEGRATION](EXTERNAL_INTEGRATION.md) ## Contributing to Arkouda toc diff --git a/src/ExternalIntegration.chpl b/src/ExternalIntegration.chpl new file mode 100644 index 0000000000..8ec570c236 --- /dev/null +++ b/src/ExternalIntegration.chpl @@ -0,0 +1,548 @@ +module ExternalIntegration { + use Curl; + use URL; + use Reflection; + use FileIO; + use Logging; + use ServerConfig; + use ServerErrors; + + private config const logLevel = ServerConfig.logLevel; + const eiLogger = new Logger(logLevel); + + private config const systemType = SystemType.NONE; + + /* + * libcurl C constants required to configure the Curl core + * of HttpChannel objects. + */ + extern const CURLOPT_VERBOSE:CURLoption; + extern const CURLOPT_USERNAME:CURLoption; + extern const CURLOPT_PASSWORD:CURLoption; + extern const CURLOPT_USE_SSL:CURLoption; + extern const CURLOPT_SSLCERT:CURLoption; + extern const CURLOPT_SSLKEY:CURLoption; + extern const CURLOPT_KEYPASSWD:CURLoption; + extern const CURLOPT_SSLCERTTYPE:CURLoption; + extern const CURLOPT_CAPATH:CURLoption; + extern const CURLOPT_CAINFO:CURLoption; + extern const CURLOPT_URL:CURLoption; + extern const CURLOPT_HTTPHEADER:CURLoption; + extern const CURLOPT_POSTFIELDS:CURLoption; + extern const CURLOPT_CUSTOMREQUEST:CURLoption; + extern const CURLOPT_FAILONERROR:CURLoption; + extern const CURLINFO_RESPONSE_CODE:CURLoption; + extern const CURLOPT_SSL_VERIFYPEER:CURLoption; + + /* + * Enum specifies the type of external system Arkouda will integrate with. + */ + enum SystemType{KUBERNETES,REDIS,CONSUL,NONE}; + + /* + * Enum describing the type of channel used to write to an + * external system. + */ + enum ChannelType{STDOUT,FILE,HTTP}; + + /* + * Enum specifies if the service endpoint is the Arkouda client or metrics + * socket + */ + enum ServiceEndpoint{ARKOUDA_CLIENT,METRICS}; + + /* + * Enum specifies the request type used to write to an external system + * via HTTP. + */ + enum HttpRequestType{POST,PUT,PATCH,DELETE}; + + /* + * Enum specifies the request format used to write to an external system + * via HTTP. + */ + enum HttpRequestFormat{TEXT,JSON,MULTIPART}; + + /* + * Retrieves the host ip address of the locale 0 arkouda_server process, which is + * useful for registering Arkouda with cloud environments such as Kubernetes. + */ + proc getConnectHostIp() throws { + var hostip: string; + on Locales[0] { + var ipString = getLineFromFile('/etc/hosts',getConnectHostname()); + try { + var splits = ipString.split(); + hostip = splits[0]:string; + hostip.split(); + } catch (e: Error) { + throw new IllegalArgumentError( + "invalid hostname -> ip address entry in /etc/hosts %t".format( + e)); + } + } + return hostip; + } + + /* + * Base class defining the Arkouda Channel interface consisting of a + * write method that writes a payload to an external system. + */ + class Channel { + proc write(payload : string) throws { + throw new owned Error("All derived classes must implement write"); + } + } + + /* + * The FileChannel class writes a payload out to a file, either by appending + * or overwriting an existing file or creating and writing to a new file. + */ + class FileChannel : Channel { + var path: string; + var append: bool; + + proc init(params: FileChannelParams) { + super.init(); + this.path = params.path; + this.append = params.append; + } + + override proc write(payload: string) throws { + if append { + appendFile(path, payload); + } else { + writeToFile(path, payload); + } + } + } + + /* + * The HttpChannel class writes a payload out to an HTTP/S endpoint + * in a configurable format via a configurable request type. + */ + class HttpChannel : Channel { + + var url: string; + var requestType: HttpRequestType; + var requestFormat: HttpRequestFormat; + var ssl: bool = false; + var sslKey: string; + var sslCert: string; + var sslCacert: string; + var sslCapath: string; + var sslKeyPasswd: string; + + proc init(params: HttpChannelParams) { + super.init(); + this.url = params.url; + this.requestType = params.requestType; + this.requestFormat = params.requestFormat; + this.ssl = params.ssl; + + if this.ssl { + this.sslKey = params.sslKey; + this.sslCert = params.sslCert; + this.sslCacert = params.sslCacert; + this.sslCapath = params.sslCapath; + this.sslKeyPasswd = params.sslKeyPasswd; + } + } + + proc configureSsl(channel) throws { + Curl.setopt(channel, CURLOPT_USE_SSL, this.ssl); + Curl.setopt(channel, CURLOPT_SSLCERT, this.sslCert); + Curl.setopt(channel, CURLOPT_SSLKEY, this.sslKey); + Curl.setopt(channel, CURLOPT_KEYPASSWD, this.sslKeyPasswd); + Curl.setopt(channel, CURLOPT_SSL_VERIFYPEER, 0); + if logLevel == LogLevel.DEBUG { + Curl.setopt(channel, CURLOPT_VERBOSE, true); + } + } + + proc generateHeader(channel) throws { + var args = new Curl.slist(); + var format = this.requestFormat; + select(format) { + when HttpRequestFormat.JSON { + args.append("Accept: application/json"); + if this.requestType == HttpRequestType.PATCH { + args.append('Content-Type: application/json-patch+json'); + } else { + args.append("Content-Type: application/json"); + } + } + when HttpRequestFormat.TEXT { + args.append("Accept: text/plain"); + args.append("Content-Type: text/plain; charset=UTF-8"); + } + otherwise { + throw new Error("Unsupported HttpFormat"); + } + + } + Curl.curl_easy_setopt(channel, CURLOPT_HTTPHEADER, args); + return args; + } + + /* + * Writes the payload out to an HTTP/S endpoint in a format specified + * by the requestFormat instance attribute via the request type + * specified in the requestType instance attribute. + */ + override proc write(payload: string) throws { + var curl = Curl.curl_easy_init(); + + Curl.curl_easy_setopt(curl, CURLOPT_URL, this.url); + + this.configureSsl(curl); + + Curl.curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1); + + var args = generateHeader(curl); + + Curl.curl_easy_setopt(curl, CURLOPT_POSTFIELDS, payload); + Curl.curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, this.requestType:string); + + eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(), + "Configured HttpChannel for type %s format %s".format( + this.requestType, this.requestFormat)); + + eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(), + "Executing Http request with payload %s".format(payload)); + + var ret = Curl.curl_easy_perform(curl); + + if ret == 0 { + eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(), + "Successfully executed Http request with payload %s".format(payload)); + } else { + if ret == 22 { + throw getErrorWithContext(getLineNumber(),getRoutineName(),getModuleName(), + "invalid request to overwrite existing entry with payload %s. Delete the existing entry first".format(payload), + "ExternalSystemError"); + + } else { + throw getErrorWithContext(getLineNumber(),getRoutineName(),getModuleName(), + "request with payload %s returned error code %i".format(payload,ret), + "ExternalSystemError"); + } + } + + args.free(); + Curl.curl_easy_cleanup(curl); + + eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(), + "Closed HttpChannel"); + } + } + + /* + * Encapsulates config parameters needed to open and write to + * a channel connected to an external system. + */ + class ChannelParams { + var channelType: ChannelType; + } + + /* + * Encapsulates config parameters needed to open and write to + * a channel connected to a file. + */ + class FileChannelParams : ChannelParams { + var path: string; + var append: bool; + + proc init(channelType: ChannelType, path: string, append: bool=false) { + super.init(channelType); + this.path = path; + this.append = append; + } + } + + /* + * Encapsulates config parameters needed to open and write to + * a HTTP or HTTPS connection. + */ + class HttpChannelParams : ChannelParams { + var url: string; + var requestType: HttpRequestType; + var requestFormat: HttpRequestFormat; + var ssl: bool = false; + var sslKey: string; + var sslCert: string; + var sslCacert: string; + var sslCapath: string; + var sslKeyPasswd: string; + + proc init(channelType: ChannelType, url: string, requestType: HttpRequestType, + requestFormat: HttpRequestFormat, ssl: bool=false, + sslKey: string, sslCert: string, sslCacert: string, sslCapath: string, + sslKeyPasswd: string) { + super.init(channelType); + this.url = url; + this.requestType = requestType; + this.requestFormat = requestFormat; + this.ssl = ssl; + if this.ssl { + this.sslKey = sslKey; + this.sslCert = sslCert; + this.sslCacert = sslCacert; + this.sslCapath = sslCapath; + this.sslKeyPasswd = sslKeyPasswd; + } + } + } + + /* + * Factory function used to retrieve a Channel based upon ChannelParams. + */ + proc getExternalChannel(params: borrowed ChannelParams) : Channel throws { + const channelType = params.channelType; + + select(channelType) { + when ChannelType.FILE { + return new FileChannel(params: FileChannelParams); + } + when ChannelType.HTTP { + return new HttpChannel(params: HttpChannelParams); + } + otherwise { + throw new owned Error("Invalid channelType"); + } + } + } + + /* + * Registers Arkouda with Kubernetes by creating a Kubernetes Service--and an Endpoints + * if Arkouda is deployed outside of Kubernetes--to enable service discovery of Arkouda + * from applications deployed within Kubernetes. + */ + proc registerWithKubernetes(appName: string, serviceName: string, + servicePort: int, targetServicePort: int) throws { + if deployment == Deployment.KUBERNETES { + registerAsInternalService(appName, serviceName, servicePort, targetServicePort); + } else { + registerAsExternalService(serviceName, servicePort, targetServicePort); + } + + proc generateEndpointCreateUrl() : string throws { + var k8sHost = ServerConfig.getEnv('K8S_HOST'); + var namespace = ServerConfig.getEnv('NAMESPACE'); + return '%s/api/v1/namespaces/%s/endpoints'.format(k8sHost,namespace); + } + + proc generateEndpointUpdateUrl() : string throws { + var k8sHost = ServerConfig.getEnv('K8S_HOST'); + var namespace = ServerConfig.getEnv('NAMESPACE'); + var name = ServerConfig.getEnv('ENDPOINT_NAME'); + return '%s/api/v1/namespaces/%s/endpoints/%s'.format(k8sHost,namespace,name); + } + + proc generateServiceCreateUrl() : string throws { + var k8sHost = ServerConfig.getEnv('K8S_HOST'); + var namespace = ServerConfig.getEnv(name='NAMESPACE',default='default'); + return '%s/api/v1/namespaces/%s/services'.format(k8sHost,namespace); + } + + proc registerAsInternalService(appName: string, serviceName: string, servicePort: int, + targetPort: int) throws { + var serviceUrl = generateServiceCreateUrl(); + var servicePayload = "".join('{"apiVersion": "v1","kind": "Service","metadata": ', + '{"name": "%s"},"spec": {"ports": [{"port": %i,' , + '"protocol": "TCP","targetPort": %i}],"selector":', + ' {"app":"%s"}}}').format( + serviceName, + servicePort, + targetPort, + appName); + + eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(), + "Registering internal service via payload %s and url %s".format( + servicePayload,serviceUrl)); + + var channel = getExternalChannel(new HttpChannelParams( + channelType=ChannelType.HTTP, + url=serviceUrl, + requestType=HttpRequestType.POST, + requestFormat=HttpRequestFormat.JSON, + ssl=true, + sslKey=ServerConfig.getEnv('KEY_FILE'), + sslCert=ServerConfig.getEnv('CERT_FILE'), + sslCacert=ServerConfig.getEnv('CACERT_FILE'), + sslCapath='', + sslKeyPasswd=ServerConfig.getEnv('KEY_PASSWD'))); + + channel.write(servicePayload); + + eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(), + "Registered internal service via payload %s and url %s".format( + servicePayload,serviceUrl)); + } + + /* + * Registers Arkouda with Kubernetes by creating a Kubernetes Service and + * Endpoints which together enable service discovery of an Arkouda instance + * deployed outside of Kubernetes from applications deployed within Kubernetes. + */ + proc registerAsExternalService(serviceName: string, servicePort: int, + serviceTargetPort: int) throws { + // Create Kubernetes Service + var serviceUrl = generateServiceCreateUrl(); + var servicePayload = "".join('{"apiVersion": "v1","kind": "Service","metadata": ', + '{"name": "%s"},"spec": {"ports": [{"port": %i,', + '"protocol": "TCP","targetPort": %i}]}}').format( + serviceName, + servicePort, + serviceTargetPort); + eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(), + "Registering external service via payload %s and url %s".format( + servicePayload,serviceUrl)); + + var channel = getExternalChannel(new HttpChannelParams( + channelType=ChannelType.HTTP, + url=serviceUrl, + requestType=HttpRequestType.POST, + requestFormat=HttpRequestFormat.JSON, + ssl=true, + sslKey=ServerConfig.getEnv('KEY_FILE'), + sslCert=ServerConfig.getEnv('CERT_FILE'), + sslCacert=ServerConfig.getEnv('CACERT_FILE'), + sslCapath='', + sslKeyPasswd=ServerConfig.getEnv('KEY_PASSWD'))); + + channel.write(servicePayload); + eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(), + "Registered external service via payload %s and url %s".format( + servicePayload,serviceUrl)); + + // Create Kubernetes Endpoints + var endpointUrl = generateEndpointCreateUrl(); + var endpointPayload = "".join('{"kind": "Endpoints","apiVersion": "v1",', + ' "metadata": {"name": "%s"}, "subsets": ', + '[{"addresses": [{"ip": "%s"}],"ports": ', + '[{"port": %i, "protocol": "TCP"}]}]}').format( + serviceName, + getConnectHostIp(), + servicePort); + + channel = getExternalChannel(new HttpChannelParams( + channelType=ChannelType.HTTP, + url=endpointUrl, + requestType=HttpRequestType.POST, + requestFormat=HttpRequestFormat.JSON, + ssl=true, + sslKey=ServerConfig.getEnv('KEY_FILE'), + sslCert=ServerConfig.getEnv('CERT_FILE'), + sslCacert=ServerConfig.getEnv('CACERT_FILE'), + sslCapath='', + sslKeyPasswd=ServerConfig.getEnv('KEY_PASSWD'))); + + eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(), + "Registering endpoint via payload %s and url %s".format( + endpointPayload,endpointUrl)); + + channel.write(endpointPayload); + eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(), + "Registered endpoint via payload %s and endpointUrl %s".format( + endpointPayload,endpointUrl)); + } + } + + /* + * Removes the Kubernetes Service and, if applicable, Endpoints that compose the + * service endpoint that enables access to Arkouda deployed within or outside of + * Kubernetes from applications deployed within Kubernetes + */ + proc deregisterFromKubernetes(serviceName: string) throws { + proc generateServiceDeleteUrl(serviceName: string) throws { + var k8sHost = ServerConfig.getEnv('K8S_HOST'); + var namespace = ServerConfig.getEnv('NAMESPACE'); + return '%s/api/v1/namespaces/%s/services/%s'.format(k8sHost,namespace,serviceName); + } + + var url = generateServiceDeleteUrl(serviceName); + var channel = getExternalChannel(new HttpChannelParams( + channelType=ChannelType.HTTP, + url=url, + requestType=HttpRequestType.DELETE, + requestFormat=HttpRequestFormat.JSON, + ssl=true, + sslKey=ServerConfig.getEnv('KEY_FILE'), + sslCert=ServerConfig.getEnv('CERT_FILE'), + sslCacert=ServerConfig.getEnv('CACERT_FILE'), + sslCapath='', + sslKeyPasswd=ServerConfig.getEnv('KEY_PASSWD'))); + channel.write('{}'); + eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(), + "Deregistered service %s from Kubernetes via url %s".format(serviceName, + url)); + } + + proc getKubernetesRegistrationParameters(serviceEndpoint: ServiceEndpoint) throws { + var serviceName: string; + var servicePort: int; + var targetServicePort: int; + + if serviceEndpoint == ServiceEndpoint.METRICS { + serviceName = ServerConfig.getEnv('METRICS_SERVICE_NAME'); + servicePort = ServerConfig.getEnv('METRICS_SERVICE_PORT', + default='5556'):int; + servicePort = ServerConfig.getEnv('METRICS_SERVICE_TARGET_PORT', + default='5556'):int; + } else { + serviceName = ServerConfig.getEnv('EXTERNAL_SERVICE_NAME'); + servicePort = ServerConfig.getEnv('EXTERNAL_SERVICE_PORT', + default='5555'):int; + targetServicePort = ServerConfig.getEnv('EXTERNAL_SERVICE_TARGET_PORT', + default='5555'):int; + } + return (serviceName,servicePort,targetServicePort); + } + + proc getKubernetesDeregisterParameters(serviceEndpoint: ServiceEndpoint) { + if serviceEndpoint == ServiceEndpoint.METRICS { + return ServerConfig.getEnv('METRICS_SERVICE_NAME'); + } else { + return ServerConfig.getEnv('EXTERNAL_SERVICE_NAME'); + } + } + + /* + * Registers Arkouda with an external system on startup, defaulting to none. + */ + proc registerWithExternalSystem(appName: string, endpoint: ServiceEndpoint) throws { + select systemType { + when SystemType.KUBERNETES { + var params: (string,int,int) = getKubernetesRegistrationParameters(endpoint); + + registerWithKubernetes(appName, params(0), params(1), params(2)); + eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(), + "Registered Arkouda with Kubernetes"); + } + otherwise { + eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(), + "Did not register Arkouda with any external systems"); + } + } + } + + /* + * Deregisters Arkouda from an external system upon receipt of shutdown command + */ + proc deregisterFromExternalSystem(endpoint: ServiceEndpoint) throws { + var serviceName = getKubernetesDeregisterParameters(endpoint); + + select systemType { + when SystemType.KUBERNETES { + deregisterFromKubernetes(serviceName); + eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(), + "Deregistered service %s from Kubernetes".format(serviceName)); + } + otherwise { + eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(), + "Did not deregister Arkouda from any external system"); + } + } + } +} \ No newline at end of file diff --git a/src/FileIO.chpl b/src/FileIO.chpl index 657dd81b00..f15e8f6ab5 100644 --- a/src/FileIO.chpl +++ b/src/FileIO.chpl @@ -22,12 +22,12 @@ module FileIO { proc appendFile(filePath : string, line : string) throws { var writer : channel; if exists(filePath) { - var aFile = try! open(filePath, iomode.rw); - writer = try! aFile.writer(start=aFile.size); + var aFile = open(filePath, iomode.rw); + writer = aFile.writer(start=aFile.size); } else { - var aFile = try! open(filePath, iomode.cwr); - writer = try! aFile.writer(); + var aFile = open(filePath, iomode.cwr); + writer = aFile.writer(); } writer.writeln(line); @@ -35,9 +35,31 @@ module FileIO { writer.close(); } + proc writeToFile(filePath : string, line : string) throws { + var writer : channel; + var aFile = open(filePath, iomode.cwr); + writer = aFile.writer(); + + writer.writeln(line); + writer.flush(); + writer.close(); + } + + proc writeLinesToFile(filePath : string, lines : string) throws { + var writer : channel; + var aFile = open(filePath, iomode.cwr); + writer = aFile.writer(); + + for line in lines { + writer.writeln(line); + } + writer.flush(); + writer.close(); + } + proc getLineFromFile(filePath : string, lineIndex : int=-1) : string throws { - var aFile = try! open(filePath, iomode.rw); - var lines = try! aFile.lines(); + var aFile = open(filePath, iomode.rw); + var lines = aFile.lines(); var line : string; var returnLine : string; var i = 1; @@ -53,6 +75,24 @@ module FileIO { return returnLine.strip(); } + + proc getLineFromFile(path: string, match: string) throws { + var aFile = open(path, iomode.r); + var reader: channel = aFile.reader(); + var returnLine: string; + + for line in reader.lines() { + if line.find(match) >= 0 { + returnLine = line; + break; + } + } + + reader.flush(); + reader.close(); + + return returnLine; + } proc delimitedFileToMap(filePath : string, delimiter : string=',') : map { var fileMap : map(keyType=string, valType=string, parSafe=false) = diff --git a/src/ServerConfig.chpl b/src/ServerConfig.chpl index a796d05137..716305fcb9 100644 --- a/src/ServerConfig.chpl +++ b/src/ServerConfig.chpl @@ -12,6 +12,15 @@ module ServerConfig use Reflection; use ServerErrors; use Logging; + + enum Deployment {STANDARD,KUBERNETES} + + /* + Type of deployment, which currently is either STANDARD, meaning + that Arkouda is deployed bare-metal or within an HPC environment, + or on Kubernetes, defaults to Deployment.STANDARD + */ + config const deployment = Deployment.STANDARD; /* Trace logging flag @@ -57,6 +66,18 @@ module ServerConfig return here.hostname; } + /* + * Retrieves the hostname of the locale 0 arkouda_server process, which is useful for + * registering Arkouda with cloud environments such as Kubernetes. + */ + proc getConnectHostname() throws { + var hostname: string; + on Locales[0] { + hostname = here.name.strip('-0'); + } + return hostname; + } + /* Indicates whether token authentication is being used for Akrouda server requests */ diff --git a/src/ServerDaemon.chpl b/src/ServerDaemon.chpl index b04e581d14..cdb56d0987 100644 --- a/src/ServerDaemon.chpl +++ b/src/ServerDaemon.chpl @@ -22,6 +22,7 @@ module ServerDaemon { use CommandMap; use Errors; use List; + use ExternalIntegration; enum ServerDaemonType {DEFAULT,INTEGRATION,METRICS} @@ -29,7 +30,7 @@ module ServerDaemon { const sdLogger = new Logger(logLevel); private config const daemonTypes = 'ServerDaemonType.DEFAULT'; - + /** * The ArkoudaServerDaemon class defines the run and shutdown * functions all derived classes must override @@ -499,10 +500,43 @@ module ServerDaemon { } } + /** + * The ExternalIntegrationServerDaemon class registers Arkouda with the + * configured external system and then invokes ArkoudaServerDeamon.run() + */ + class ExternalIntegrationServerDaemon : DefaultServerDaemon { + + override proc run() throws { + on Locales[0] { + var appName: string; + + if serverHostname.count('arkouda-locale') > 0 { + appName = 'arkouda-locale'; + } else { + appName = 'arkouda-server'; + } + + registerWithExternalSystem(appName, ServiceEndpoint.ARKOUDA_CLIENT); + } + super.run(); + } + + override proc shutdown(user: string) throws { + on Locales[here.id] { + deregisterFromExternalSystem(ServiceEndpoint.ARKOUDA_CLIENT); + } + + super.shutdown(user); + } + } + proc getServerDaemon(daemonType: ServerDaemonType) : shared ArkoudaServerDaemon throws { select daemonType { when ServerDaemonType.DEFAULT { - return new DefaultServerDaemon():ArkoudaServerDaemon; + return new DefaultServerDaemon():ArkoudaServerDaemon; + } + when ServerDaemonType.INTEGRATION { + return new ExternalIntegrationServerDaemon():ArkoudaServerDaemon; } otherwise { throw getErrorWithContext(