diff --git a/src/MemoryMgmt.chpl b/src/MemoryMgmt.chpl new file mode 100644 index 0000000000..48cbcd1240 --- /dev/null +++ b/src/MemoryMgmt.chpl @@ -0,0 +1,132 @@ +module MemoryMgmt { + + use Subprocess; + use Memory.Diagnostics; + use Logging; + + private config const logLevel = LogLevel.DEBUG; + private config const logChannel = LogChannel.CONSOLE; + const mmLogger = new Logger(logLevel,logChannel); + + /* + * Indicates whether static locale host memory or dynamically-captured + * memory allocated to the Arkouda process is used to estimate whether + * sufficient memory is available to execute the requested command. + */ + enum MemMgmtType {STATIC,DYNAMIC} + + /* + * The percentage of currently available memory on each locale host + * that is the limit for memory allocated to each Arkouda locale. + */ + config const availableMemoryPct: real = 90; + + /* + * Config param that indicates whether the static memory mgmt logic in + * ServerConfig or dynamic memory mgmt in this module will be used. The + * default is static memory mgmt. + */ + config const memMgmtType = MemMgmtType.STATIC; + + proc getArkoudaPid() : string throws { + var pid = spawn(["pgrep","arkouda_server"], stdout=pipeStyle.pipe); + + var pid_string:string; + var line:string; + + while pid.stdout.readLine(line) { + pid_string = line.strip(); + } + + pid.close(); + return pid_string; + } + + proc getArkoudaMemAlloc() : uint(64) throws { + var pid = getArkoudaPid(); + + var sub = spawn(["pmap", pid], stdout=pipeStyle.pipe); + + var malloc_string:string; + var line:string; + + while sub.stdout.readLine(line) { + if line.find("total") > 0 { + var splits = line.split('total'); + malloc_string = splits(1).strip().strip('K'); + } + } + + sub.close(); + return malloc_string:uint(64) * 1000; + } + + proc getAvailMemory() : uint(64) throws { + var aFile = open('/proc/meminfo', iomode.r); + var lines = aFile.reader().lines(); + var line : string; + + var memAvail:uint(64); + + for line in lines do { + if line.find('MemAvailable:') >= 0 { + var splits = line.split('MemAvailable:'); + memAvail = splits[1].strip().strip(' kB'):uint(64); + } + } + + return (AutoMath.round(availableMemoryPct/100 * memAvail)*1000):uint(64); + } + + proc localeMemAvailable(reqMemory) : bool throws { + var arkMemAlloc = getArkoudaMemAlloc(); + var arkMemUsed = memoryUsed(); + var availMemory = getAvailMemory(); + + mmLogger.debug(getModuleName(),getRoutineName(),getLineNumber(), + "locale: %s reqMemory: %i arkMemAlloc: %i arkMemUsed: %i availMemory: %i".format(here.id, + reqMemory, + arkMemAlloc, + arkMemUsed, + availMemory)); + var newArkoudaMemory = reqMemory:int + arkMemUsed:int; + + if newArkoudaMemory:int <= arkMemAlloc:int { + return true; + } else { + if newArkoudaMemory:int <= availMemory { + return true; + } else { + var msg = "Arkouda memory request %i on locale %s exceeds available memory %i".format(newArkoudaMemory, + here.id, + availMemory); + mmLogger.error(getModuleName(),getRoutineName(),getLineNumber(),msg); + return false; + } + } + } + + /* + * Returns a boolean indicating whether there is either sufficient memory within the + * memory allocated to Arkouda on each locale host; if true for all locales, returns true. + * + * If the reqMemory exceeds the memory currently allocated to at least one locale, each locale + * host is checked to see if there is memory available to allocate more memory to each + * corresponding locale. If there is insufficient memory available on at least one locale, + * returns false. If there is sufficient memory on all locales to allocate sufficient, + * additional memory to Arkouda to execute the command, returns true. + */ + proc isMemAvailable(reqMemory) : bool throws { + var overMemLimit : bool = false; + + coforall loc in Locales with (ref overMemLimit) { + on loc { + if !localeMemAvailable(reqMemory) { + overMemLimit = true; + } + } + } + + return if overMemLimit then false else true; + } +} diff --git a/src/ServerConfig.chpl b/src/ServerConfig.chpl index c890a1915e..9e55eec9bf 100644 --- a/src/ServerConfig.chpl +++ b/src/ServerConfig.chpl @@ -12,6 +12,7 @@ module ServerConfig use Reflection; use ServerErrors; use Logging; + use MemoryMgmt; use ArkoudaFileCompat; @@ -277,6 +278,21 @@ module ServerConfig proc overMemLimit(additionalAmount:int) throws { // must set config var "-smemTrack=true"(compile time) or "--memTrack=true" (run time) // to use memoryUsed() procedure from Chapel's Memory module + proc checkStaticMemoryLimit(total: real) { + if total > getMemLimit() { + var pct = AutoMath.round((total:real / getMemLimit():real * 100):uint); + var msg = "cmd requiring %i bytes of memory exceeds %i limit with projected pct memory used of %i%%".format( + total, getMemLimit(), pct); + scLogger.error(getModuleName(),getRoutineName(),getLineNumber(), msg); + throw getErrorWithContext( + msg=msg, + lineNumber=getLineNumber(), + routineName=getRoutineName(), + moduleName=getModuleName(), + errorClass="ErrorWithContext"); + } + } + if (memTrack) { // this is a per locale total var total = getMemUsed() + (additionalAmount:uint / numLocales:uint); @@ -291,17 +307,43 @@ module ServerConfig (getMemLimit():real * numLocales)) * 100):uint)); } } - if total > getMemLimit() { - var pct = AutoMath.round((total:real / getMemLimit():real * 100):uint); - var msg = "cmd requiring %i bytes of memory exceeds %i limit with projected pct memory used of %i%%".format( + + /* + * If the MemoryMgmt.memMgmtType is STATIC (default), use the memory management logic based upon + * a percentage of the locale0 host machine physical memory. + * + * If DYNAMIC, use the new dynamic memory mgmt capability in the MemoryMgmt module that first determines + * for each locale if there's sufficient space within the memory currently allocated to the Arkouda + * Chapel process to accommodate the projected memory required by the cmd. If not, then MemoryMgmt + * checks the available memory on each locale to see if more can be allocated to the Arkouda-Chapel process. + * If the answer is no on any locale, the cmd is not executed and MemoryMgmt logs the corresponding locales + * server-side. More detailed client-side reporting can be implemented in a later version. + */ + if memMgmtType == MemMgmtType.STATIC { + if total > getMemLimit() { + var pct = AutoMath.round((total:real / getMemLimit():real * 100):uint); + var msg = "cmd requiring %i bytes of memory exceeds %i limit with projected pct memory used of %i%%".format( total, getMemLimit(), pct); - scLogger.error(getModuleName(),getRoutineName(),getLineNumber(), msg); - throw getErrorWithContext( - msg=msg, - lineNumber=getLineNumber(), - routineName=getRoutineName(), - moduleName=getModuleName(), - errorClass="ErrorWithContext"); + scLogger.error(getModuleName(),getRoutineName(),getLineNumber(), msg); + throw getErrorWithContext( + msg=msg, + lineNumber=getLineNumber(), + routineName=getRoutineName(), + moduleName=getModuleName(), + errorClass="ErrorWithContext"); + } + } else { + if !isMemAvailable(additionalAmount) { + var msg = "cmd requiring %i more bytes of memory exceeds available memory on one or more locales".format( + additionalAmount); + scLogger.error(getModuleName(),getRoutineName(),getLineNumber(), msg); + throw getErrorWithContext( + msg=msg, + lineNumber=getLineNumber(), + routineName=getRoutineName(), + moduleName=getModuleName(), + errorClass="ErrorWithContext"); + } } } }