Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement ServerStatusDaemon #2674

Merged
51 changes: 36 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,10 @@ This yielded a >20TB dataframe in Arkouda.
- [Connecting to Arkouda](#run-ak-connect)
5. [Logging](#log-ak)
6. [Type Checking in Arkouda](#typecheck-ak)
7. [Environment Variables](#env-vars-ak)
8. [Versioning](#versioning-ak)
9. [External Systems Integration](#external-integration)
10. [Metrics](#metrics)
11. [Contributing](#contrib-ak)
7. [Versioning](#versioning-ak)
hokiegeek2 marked this conversation as resolved.
Show resolved Hide resolved
8. [External Systems Integration](#external-integration)
9. [Metrics](#metrics)
10. [Contributing](#contrib-ak)

<a id="prereqs"></a>
## Prerequisites <sup><sup><sub><a href="#toc">toc</a></sub></sup></sup>
Expand Down Expand Up @@ -249,22 +248,46 @@ is saved to the .arkouda/tokens.txt file for re-use.

By default, each Arkouda locale utilizes all available memory and CPU cores on the host machine. However, it is possible to set per-locale limits for both memory as well as CPU cores.

The max number of CPU cores utilized by each locale is set via the CHPL_RT_NUM_THREADS_PER_LOCALE environment variable. An example below sets the maximum number of cores for each locale to 16:
#### Per-Locale Memory Limits

There are three approaches to setting the max memory used by each Arkouda locale. Firstly, the built-in Chapel approach sets the max per-locale memory to an explicit number of bytes via the --memMax startup parameter. For example, to set the max memory utilized by each locale to 100 GB, the Arkouda startup command would include the following:

```
export CHPL_RT_NUM_THREADS_PER_LOCALE=16
./arkouda_server --memMax=100000000000
```

The max memory utilized by each locale can be set in one of two ways: percentage of physical memory or a limit set in bytes. By default, the max per-locale memory is set to ninety (90) percent of the physical memory on each Arkouda locale host. If another percentage is desired, this is set via the --perLocaleMemLimit startup parameter. For example, to set max memory utilized by each locale to seventy (70) percent of physical memory, the Arkouda startup command would be as follows:
The Arkouda dynamic memory limit approach sets the per-locale memory limit based upon a configurable percentage of available memory on each locale host. Prior to the execution of each comand, the MemoryMgmt [localeMemAvailable](https://github.com/Bears-R-Us/arkouda/blob/e4a48c52eb00097e6e1dfa365cbc586e2e988a85/src/MemoryMgmt.chpl#L133) function does the following on each locale:
hokiegeek2 marked this conversation as resolved.
Show resolved Hide resolved

1. Verifies if the projected, additional per-locale memory required by the incoming command exceeds the memory currently allocated to Arkouda. If the projected, additional memory is within the memory currently allocated to Arkouda on each locale, the command is allowed to proceed.
hokiegeek2 marked this conversation as resolved.
Show resolved Hide resolved
2. If the projected, additional per-locale memory exceeds the memory currently allocated to Arkouda on any locale, localeMemAvailable checks if the configurable percentage of available memory on each node will accommodate the projected, additional memory of the incoming command. If so, the command is allowed to proceed.
3. If the projected, additional per-locale memory required by the incoming command exceeds the configured percentage of available memory on any locale, localeMemAvailable returns false and a corresponding error is [thrown](https://github.com/Bears-R-Us/arkouda/blob/e4a48c52eb00097e6e1dfa365cbc586e2e988a85/src/ServerConfig.chpl#L348) in the ServerConfig [overMemLimit](https://github.com/Bears-R-Us/arkouda/blob/e4a48c52eb00097e6e1dfa365cbc586e2e988a85/src/ServerConfig.chpl#L286) function.

In the example below, dynamic memory checking is enabled with the default availableMemoryPct of 90, configuring Arkouda to throw an error if (1) the projected, additional memory required for a command exceeds memory currently allocated to Arkouda on 1..n locales and (2) the projected, additional memory will exceed 90 percent of available memory on 1..n locales.

```
./arkouda_server --MemoryMgmt.memMgmtType=MemMgmtType.DYNAMIC
```

Setting additionalMemoryPct to 70 would result in the following startup command:

```
./arkouda_server --MemoryMgmt.memMgmtType=MemMgmtType.DYNAMIC ----MemoryMgmt.additionalMemoryPct=70
```

Important note: dynamic memory checking _works on Linux and Unix systems only._

In the final, default approach, the max memory utilized by each locale is set as percentage of physical memory on the locale0 host, defaulting to 90 percent. If another percentage is desired, this is set via the --perLocaleMemLimit startup parameter. For example, to set max memory utilized by each locale to seventy (70) percent of physical memory on locale0, the Arkouda startup command would include the following:

```
./arkouda_server --perLocaleMemLimit=70
```

In addition, the max per-locale memory can instead be set to an explicit number of bytes via the --memMax startup parameter. For example, to set the max memory utilized by each locale to 100 GB, the Arkouda startup command would be as follows:
#### Per-Locale CPU Core Limits

The max number of CPU cores utilized by each locale is set via the CHPL_RT_NUM_THREADS_PER_LOCALE environment variable. An example below sets the maximum number of cores for each locale to 16:

```
./arkouda_server --memMax=100000000000
export CHPL_RT_NUM_THREADS_PER_LOCALE=16
```

<a id="run-ak-connect"></a>
Expand Down Expand Up @@ -378,17 +401,13 @@ type checking require type hints. Consequently, to opt-out of type checking, sim

</details>

<a id="env-vars-ak"></a>
## Environment Variables <sup><sup><sub><a href="#toc">toc</a></sub></sup></sup>
The various Arkouda aspects (compilation, run-time, client, tests, etc.) can be configured using a number of environment
variables (env vars). See the [ENVIRONMENT](ENVIRONMENT.md) documentation for more details.

<a id="versioning-ak"></a>
## Versioning <sup><sup><sub><a href="#toc">toc</a></sub></sup></sup>
Beginning after tag `v2019.12.10` versioning is now performed using [Versioneer](https://github.com/python-versioneer/python-versioneer)
which determines the version based on the location in `git`.

An example using a hypothetical tag `1.2.3.4`
An example using a hypothetical tag 1.2.3.4
hokiegeek2 marked this conversation as resolved.
Show resolved Hide resolved

```bash
git checkout 1.2.3.4
Expand All @@ -406,6 +425,7 @@ python -m arkouda|tail -n 2
>> Client Version: 1.2.3.4+1.g9dca4c8
>> 1.2.3.4+1.g9dca4c8
```

In the hypothetical cases above _Versioneer_ tells you the version and how far / how many commits beyond the tag your repo is.

When building the server-side code the same versioning information is included in the build. If the server and client do not
Expand All @@ -423,6 +443,7 @@ arkouda server version = v2019.12.10+1679.abc2f48a.dirty
```

For maintainers, creating a new version is as simple as creating a tag in the repository; i.e.

```bash
git checkout master
git tag 1.2.3.4
Expand Down
41 changes: 40 additions & 1 deletion src/MemoryMgmt.chpl
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
module MemoryMgmt {

use Subprocess;
use Logging;
use FileSystem;
use Reflection;

use Logging;
use ServerErrors;
use ArkoudaMemDiagnosticsCompat;
use ArkoudaFileCompat;

Expand Down Expand Up @@ -40,7 +43,19 @@ module MemoryMgmt {
var locale_hostname: string;
}

proc isSupportedOS() : bool throws {
return exists('/proc/meminfo');
}

proc getArkoudaPid() : string throws {
if !isSupportedOS() {
throw new owned ErrorWithContext("getArkoudaPid can only be invoked on Unix and Linux systems",
getLineNumber(),
getRoutineName(),
getModuleName(),
"UnsupportedOSError");
}

var pid = spawn(["pgrep","arkouda_server"], stdout=pipeStyle.pipe);

var pid_string:string;
Expand All @@ -55,6 +70,14 @@ module MemoryMgmt {
}

proc getArkoudaMemAlloc() : uint(64) throws {
if !isSupportedOS() {
throw new owned ErrorWithContext("getArkoudaMemAlloc can only be invoked on Unix and Linux systems",
getLineNumber(),
getRoutineName(),
getModuleName(),
"UnsupportedOSError");
}

var pid = getArkoudaPid();

var sub = spawn(["pmap", pid], stdout=pipeStyle.pipe);
Expand All @@ -74,6 +97,14 @@ module MemoryMgmt {
}

proc getAvailMemory() : uint(64) throws {
if !isSupportedOS() {
throw new owned ErrorWithContext("getAvailMemory can only be invoked on Unix and Linux systems",
getLineNumber(),
getRoutineName(),
getModuleName(),
"UnsupportedOSError");
}

var aFile = open('/proc/meminfo', ioMode.r);
var lines = aFile.reader().lines();
var line : string;
Expand All @@ -92,6 +123,14 @@ module MemoryMgmt {
}

proc getTotalMemory() : uint(64) throws {
if !isSupportedOS() {
throw new owned ErrorWithContext("getTotalMemory can only be invoked on Unix and Linux systems",
getLineNumber(),
getRoutineName(),
getModuleName(),
"UnsupportedOSError");
}

var aFile = open('/proc/meminfo', ioMode.r);
var lines = aFile.reader().lines();
var line : string;
Expand Down
84 changes: 81 additions & 3 deletions src/ServerDaemon.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module ServerDaemon {
use ArkoudaListCompat;
use ArkoudaIOCompat;

enum ServerDaemonType {DEFAULT,INTEGRATION,METRICS}
enum ServerDaemonType {DEFAULT,INTEGRATION,METRICS,STATUS}

private config const logLevel = ServerConfig.logLevel;
private config const logChannel = ServerConfig.logChannel;
Expand Down Expand Up @@ -737,8 +737,9 @@ module ServerDaemon {
}

/**
* The MetricsServerDaemon provides an endpoint for gathering user, request,
* locale, and server-scoped metrics
* The MetricsServerDaemon provides a separate endpoint for gathering user, request,
* locale, and server-scoped metrics. The separate port lessens the possibility of
* metrics requests being blocked by command requests.
*/
class MetricsServerDaemon : ArkoudaServerDaemon {

Expand Down Expand Up @@ -839,6 +840,80 @@ module ServerDaemon {
}
}

/**
* The ServerStatusDaemon provides a non-blocking endpoint for retrieving
* server status via a separate, dedicated port to lessen the chances of
* blocking incoming status requests with command requests.
*/
class ServerStatusDaemon : ArkoudaServerDaemon {

var context: ZMQ.Context;
var socket : ZMQ.Socket;

proc init() {
this.socket = this.context.socket(ZMQ.REP);
this.port = try! getEnv('SERVER_STATUS_PORT','5557'):int;

try! this.socket.bind("tcp://*:%?".doFormat(this.port));
try! sdLogger.debug(getModuleName(),
getRoutineName(),
getLineNumber(),
"initialized and listening in port %i".doFormat(
this.port));
}

override proc run() throws {
while !this.shutdownDaemon {
sdLogger.debug(getModuleName(), getRoutineName(), getLineNumber(),
"awaiting status requests on port %i".doFormat(this.port));
var req = this.socket.recv(bytes).decode();

var msg: RequestMsg = extractRequest(req);
var user = msg.user;
var token = msg.token;
var cmd = msg.cmd;
var format = msg.format;
var args = msg.args;
var size = msg.size: int;

var msgArgs: owned MessageArgs;
if size > 0 {
msgArgs = parseMessageArgs(args, size);
}
else {
msgArgs = new owned MessageArgs();
}

var repTuple: MsgTuple;

select cmd {
when "ruok" {
repTuple = new MsgTuple("imok", MsgType.NORMAL);
}

when "getmemstatus" {
repTuple = getMemoryStatusMsg(cmd, msgArgs, st);
}
when "connect" {
if authenticate {
repTuple = new MsgTuple("connected to arkouda status server tcp://*:%i as user " +
"%s with token %s".doFormat(this.port,user,token), MsgType.NORMAL);
} else {
repTuple = new MsgTuple("connected to arkouda status server tcp://*:%i".doFormat(this.port),
MsgType.NORMAL);
}
}
when "getconfig" {repTuple = getconfigMsg(cmd, msgArgs, st);}
}

this.socket.send(serialize(msg=repTuple.msg,msgType=repTuple.msgType,
msgFormat=MsgFormat.STRING, user=user));
}

return;
}
}

proc getServerDaemon(daemonType: ServerDaemonType) : shared ArkoudaServerDaemon throws {
select daemonType {
when ServerDaemonType.DEFAULT {
Expand All @@ -850,6 +925,9 @@ module ServerDaemon {
when ServerDaemonType.METRICS {
return new shared MetricsServerDaemon();
}
when ServerDaemonType.STATUS {
return new shared ServerStatusDaemon();
}
otherwise {
throw getErrorWithContext(
msg="Unsupported ServerDaemonType: %?".doFormat(daemonType),
Expand Down
14 changes: 14 additions & 0 deletions src/ServerErrors.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,20 @@ module ServerErrors {
proc init(){ super.init(); }
}

/*
* The UnsupportedOSError is thrown if a function cannot be executed on the host OS.
*/
class UnsupportedOSError: ErrorWithContext {

proc init(msg : string, lineNumber: int, routineName: string,
moduleName: string) {
super.init(msg,lineNumber,routineName,moduleName,errorClass='UnsupportedOSError');
}

proc init(){ super.init(); }
}


/*
* Generatea a detailed, context-rich error message for errors such as instances of
* built-in Chapel Errors in a format that matches the Arkouda ErrorWithContext
Expand Down