Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32958 Roxie dynamic priority #19300

Open
wants to merge 2 commits into
base: candidate-9.8.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ void setMulticastEndpoints(unsigned numChannels);
#define ROXIE_SLA_PRIORITY 0x40000000 // mask in activityId indicating it goes SLA priority queue
#define ROXIE_HIGH_PRIORITY 0x80000000 // mask in activityId indicating it goes on the fast queue
#define ROXIE_LOW_PRIORITY 0x00000000 // mask in activityId indicating it goes on the slow queue (= default)
// background priority queue is when both ROXIE_SLA_PRIORITY and ROXIE_HIGH_PRIORITY are set
#define ROXIE_PRIORITY_MASK (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY | ROXIE_LOW_PRIORITY)

#define ROXIE_ACTIVITY_FETCH 0x20000000 // or'ed into activityId for fetch part of full keyed join activities
Expand Down Expand Up @@ -303,6 +304,7 @@ extern StringArray allQuerySetNames;
extern bool blockedLocalAgent;
extern bool acknowledgeAllRequests;
extern unsigned packetAcknowledgeTimeout;
extern unsigned dynPriorityAdjustTime;
extern bool alwaysTrustFormatCrcs;
extern bool allFilesDynamic;
extern bool lockSuperFiles;
Expand Down
5 changes: 5 additions & 0 deletions roxie/ccd/ccdcontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,11 @@ class CRoxieContextBase : implements IRoxieAgentContext, implements ICodeContext
return options;
}

virtual unsigned queryElapsedMs() const
{
return elapsedTimer.elapsedMs();
}

const char *queryAuthToken()
{
return authToken.str();
Expand Down
1 change: 1 addition & 0 deletions roxie/ccd/ccdcontext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ interface IRoxieAgentContext : extends IRoxieContextLogger
virtual void noteChildGraph(unsigned id, IActivityGraph *childGraph) = 0;
virtual roxiemem::IRowManager &queryRowManager() = 0;
virtual const QueryOptions &queryOptions() const = 0;
virtual unsigned queryElapsedMs() const = 0;
virtual void addAgentsReplyLen(unsigned len, unsigned duplicates, unsigned resends) = 0;
virtual const char *queryAuthToken() = 0;
virtual const IResolvedFile *resolveLFN(const char *filename, bool isOpt, bool isPrivilegedUser) = 0;
Expand Down
12 changes: 8 additions & 4 deletions roxie/ccd/ccdlistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1239,11 +1239,12 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker
{
void noteQuery(bool failed, unsigned elapsedTime, unsigned priority)
{
switch(priority)
switch((int)priority)
{
case 0: loQueryStats.noteQuery(failed, elapsedTime); break;
case 1: hiQueryStats.noteQuery(failed, elapsedTime); break;
case 2: slaQueryStats.noteQuery(failed, elapsedTime); break;
case -1: bgQueryStats.noteQuery(failed, elapsedTime); break;
}
combinedQueryStats.noteQuery(failed, elapsedTime);
}
Expand Down Expand Up @@ -1355,11 +1356,12 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker
isBlind = isBlind || blindLogging;
logctx.setBlind(isBlind);
priority = queryFactory->queryOptions().priority;
switch (priority)
switch ((int)priority)
{
case 0: loQueryStats.noteActive(); break;
case 1: hiQueryStats.noteActive(); break;
case 2: slaQueryStats.noteActive(); break;
case -1: bgQueryStats.noteActive(); break;
}
combinedQueryStats.noteActive();
Owned<IRoxieServerContext> ctx = queryFactory->createContext(wu, logctx);
Expand Down Expand Up @@ -1524,11 +1526,12 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte
virtual void noteQueryActive()
{
unsigned priority = getQueryPriority();
switch (priority)
switch ((int)priority)
{
case 0: loQueryStats.noteActive(); break;
case 1: hiQueryStats.noteActive(); break;
case 2: slaQueryStats.noteActive(); break;
case -1: bgQueryStats.noteActive(); break;
}
unknownQueryStats.noteComplete();
combinedQueryStats.noteActive();
Expand Down Expand Up @@ -1677,11 +1680,12 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte
}
else
{
switch(getQueryPriority())
switch((int)getQueryPriority())
{
case 0: loQueryStats.noteQuery(failed, elapsedTime); break;
case 1: hiQueryStats.noteQuery(failed, elapsedTime); break;
case 2: slaQueryStats.noteQuery(failed, elapsedTime); break;
case -1: bgQueryStats.noteQuery(failed, elapsedTime); break;
default: unknownQueryStats.noteQuery(failed, elapsedTime); return; // Don't include unknown in the combined stats
}
combinedQueryStats.noteQuery(failed, elapsedTime);
Expand Down
2 changes: 2 additions & 0 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ unsigned numRequestArrayThreads = 5;
bool blockedLocalAgent = true;
bool acknowledgeAllRequests = true;
unsigned packetAcknowledgeTimeout = 100;
unsigned dynPriorityAdjustTime = 0; // default off (0)
unsigned headRegionSize;
unsigned ccdMulticastPort;
bool enableHeartBeat = true;
Expand Down Expand Up @@ -1007,6 +1008,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
acknowledgeAllRequests = topology->getPropBool("@acknowledgeAllRequests", acknowledgeAllRequests);
headRegionSize = topology->getPropInt("@headRegionSize", 0);
packetAcknowledgeTimeout = topology->getPropInt("@packetAcknowledgeTimeout", packetAcknowledgeTimeout);
dynPriorityAdjustTime = topology->getPropInt("@dynPriorityAdjustTime", 0);
ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT);
statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600);
roxiemem::setMemTraceSizeLimit((memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0));
Expand Down
41 changes: 39 additions & 2 deletions roxie/ccd/ccdquery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ class CSharedOnceContext : public CInterfaceOf<ISharedOnceContext>
QueryOptions::QueryOptions()
{
priority = 0;
dynPriority = 0;
timeLimit = defaultTimeLimit[0];
warnTimeLimit = defaultWarnTimeLimit[0];

Expand Down Expand Up @@ -358,6 +359,7 @@ QueryOptions::QueryOptions()
QueryOptions::QueryOptions(const QueryOptions &other)
{
priority = other.priority;
dynPriority = other.dynPriority;
timeLimit = other.timeLimit;
warnTimeLimit = other.warnTimeLimit;

Expand Down Expand Up @@ -400,8 +402,18 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat
updateFromWorkUnit(priority, wu, "priority");
if (stateInfo)
updateFromContext(priority, stateInfo, "@priority");
timeLimit = defaultTimeLimit[priority];
warnTimeLimit = defaultWarnTimeLimit[priority];
dynPriority = priority;
if ((int)priority < 0)
{
// use LOW queue time limits ...
timeLimit = defaultTimeLimit[0];
warnTimeLimit = defaultWarnTimeLimit[0];
}
else
{
timeLimit = defaultTimeLimit[priority];
warnTimeLimit = defaultWarnTimeLimit[priority];
}
updateFromWorkUnit(timeLimit, wu, "timeLimit");
updateFromWorkUnit(warnTimeLimit, wu, "warnTimeLimit");
updateFromWorkUnitM(memoryLimit, wu, "memoryLimit");
Expand Down Expand Up @@ -486,6 +498,31 @@ void QueryOptions::setFromContext(const IPropertyTree *ctx)
if (ctx)
{
// Note: priority cannot be set at context level
// b/c this is after activities have been created, but we could
// dynamically adj priority in the header activityId before sending
int tmpPriority;
updateFromContext(tmpPriority, ctx, "@priority", "_Priority");
if (tmpPriority > 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be clearer to have some constants for MinQueryPriority and MaxQueryPriority and compare against those. (Comparing > 1 would be clearer to compare with > 2, since that is leaking the next test into the bounds test).

tmpPriority = 1;
if (tmpPriority < -1)
tmpPriority = -1;

if (tmpPriority < (int)priority)
{
dynPriority = tmpPriority;
if (dynPriority < 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably not worth it, but this could be commoned up with the code in setFromWorkunit e.g. a setDynamicPriority() function.

{
// use LOW queue time limits ...
timeLimit = defaultTimeLimit[0];
warnTimeLimit = defaultWarnTimeLimit[0];
}
else
{
timeLimit = defaultTimeLimit[dynPriority];
warnTimeLimit = defaultWarnTimeLimit[dynPriority];
}
}

updateFromContext(timeLimit, ctx, "@timeLimit", "_TimeLimit");
updateFromContext(warnTimeLimit, ctx, "@warnTimeLimit", "_WarnTimeLimit");
updateFromContextM(memoryLimit, ctx, "@memoryLimit", "_MemoryLimit");
Expand Down
2 changes: 1 addition & 1 deletion roxie/ccd/ccdquery.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ class QueryOptions
void setFromContext(const IPropertyTree *ctx);
void setFromAgentLoggingFlags(unsigned loggingFlags);


unsigned priority;
mutable int dynPriority;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically should be an atomic, but unlikely to cause any grief.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better as an unsigned so consistent with priority.

unsigned timeLimit;
unsigned warnTimeLimit;
unsigned traceLimit;
Expand Down
67 changes: 39 additions & 28 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
case ROXIE_SLA_PRIORITY: ret.append("SLA"); break;
case ROXIE_HIGH_PRIORITY: ret.append("HIGH"); break;
case ROXIE_LOW_PRIORITY: ret.append("LOW"); break;
case ROXIE_SLA_PRIORITY + ROXIE_HIGH_PRIORITY: ret.append("BG"); break;
default: ret.append("???"); break;
}
ret.appendf(" queryHash=%" I64F "x ch=%u seq=%d cont=%d server=", queryHash, channel, overflowSequence, continueSequence);
Expand Down Expand Up @@ -1166,11 +1167,14 @@ class RoxieQueue : public CInterface, implements IThreadFactory
public:
IMPLEMENT_IINTERFACE;

RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers)
RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers, const char *qname=nullptr)
{
headRegionSize = _headRegionSize;
numWorkers = _numWorkers;
workers.setown(createThreadPool("RoxieWorkers", this, false, nullptr, numWorkers));
StringBuffer tname("RoxieWorkers");
if (qname && *qname)
tname.appendf(" (%s)", qname);
workers.setown(createThreadPool(tname.str(), this, false, nullptr, numWorkers));
started = 0;
idle = 0;
if (IBYTIbufferSize)
Expand Down Expand Up @@ -1904,12 +1908,13 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
RoxieQueue slaQueue;
RoxieQueue hiQueue;
RoxieQueue loQueue;
RoxieQueue bgQueue;
unsigned numWorkers;

public:
IMPLEMENT_IINTERFACE;

RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers), hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers), numWorkers(_numWorkers)
RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers, "SLA"), hiQueue(headRegionSize, _numWorkers, "HIGH"), loQueue(headRegionSize, _numWorkers, "LOW"), bgQueue(headRegionSize, _numWorkers/2 + 1, "BG"), numWorkers(_numWorkers)
{
}

Expand All @@ -1923,27 +1928,31 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
slaQueue.setHeadRegionSize(newSize);
hiQueue.setHeadRegionSize(newSize);
loQueue.setHeadRegionSize(newSize);
bgQueue.setHeadRegionSize(newSize);
}

virtual void start()
{
loQueue.start();
hiQueue.start();
slaQueue.start();
bgQueue.start(); // consider nice(+3) BG threads
}

virtual void stop()
{
loQueue.stopAll();
hiQueue.stopAll();
slaQueue.stopAll();
bgQueue.stopAll();
}

virtual void join()
{
loQueue.join();
hiQueue.join();
slaQueue.join();
bgQueue.join();
}

IArrayOf<CallbackEntry> callbacks;
Expand Down Expand Up @@ -2254,7 +2263,7 @@ class DelayedPacketQueue
}

// Move any that we are done waiting for our buddy onto the active queue
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue)
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue)
{
assert(GetCurrentThreadId()==roxiePacketReaderThread);
DelayedPacketEntry *finger = head;
Expand All @@ -2270,12 +2279,13 @@ class DelayedPacketQueue
DBGLOG("No IBYTI received in time for delayed packet %s - enqueuing", header.toString(s).str());
}
unsigned __int64 IBYTIdelay = nsTick()-packet->queryEnqueuedTimeStamp();
if (header.activityId & ROXIE_SLA_PRIORITY)
slaQueue.enqueue(packet, IBYTIdelay);
else if (header.activityId & ROXIE_HIGH_PRIORITY)
hiQueue.enqueue(packet, IBYTIdelay);
else
loQueue.enqueue(packet, IBYTIdelay);
switch(header.activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: slaQueue.enqueue(packet, IBYTIdelay); break;
case ROXIE_HIGH_PRIORITY: hiQueue.enqueue(packet, IBYTIdelay); break;
case ROXIE_LOW_PRIORITY: loQueue.enqueue(packet, IBYTIdelay); break;
default: bgQueue.enqueue(packet, IBYTIdelay); break;
}
for (unsigned subChannel = 0; subChannel < MAX_SUBCHANNEL; subChannel++)
{
if (header.subChannels[subChannel].isMe() || header.subChannels[subChannel].isNull())
Expand Down Expand Up @@ -2354,11 +2364,11 @@ class DelayedPacketQueueChannel : public CInterface
}
return min;
}
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue)
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue)
{
for (unsigned queue = 0; queue <= maxSeen; queue++)
{
queues[queue].checkExpired(now, slaQueue, hiQueue, loQueue);
queues[queue].checkExpired(now, slaQueue, hiQueue, loQueue, bgQueue);
}
}
private:
Expand Down Expand Up @@ -2398,11 +2408,11 @@ class DelayedPacketQueueManager
}
return ret;
}
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue)
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue)
{
ForEachItemIn(idx, channels)
{
channels.item(idx).checkExpired(now, slaQueue, hiQueue, loQueue);
channels.item(idx).checkExpired(now, slaQueue, hiQueue, loQueue, bgQueue);
}
}
private:
Expand Down Expand Up @@ -2894,12 +2904,13 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
StringBuffer s;
DBGLOG("Read roxie packet: %s", header.toString(s).str());
}
if (header.activityId & ROXIE_SLA_PRIORITY)
processMessage(mb, header, slaQueue);
else if (header.activityId & ROXIE_HIGH_PRIORITY)
processMessage(mb, header, hiQueue);
else
processMessage(mb, header, loQueue);
switch(header.activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: processMessage(mb, header, slaQueue); break;
case ROXIE_HIGH_PRIORITY: processMessage(mb, header, hiQueue); break;
case ROXIE_LOW_PRIORITY: processMessage(mb, header, loQueue); break;
default: processMessage(mb, header, bgQueue); break;
}
}
catch (IException *E)
{
Expand Down Expand Up @@ -2938,7 +2949,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
}
}
#ifdef NEW_IBYTI
delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue);
delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue, bgQueue);
#endif
}
return 0;
Expand Down Expand Up @@ -3708,13 +3719,13 @@ class RoxieLocalQueueManager : public RoxieReceiverBase
return; // No point sending the retry in localAgent mode
}
RoxieQueue *targetQueue;
if (header.activityId & ROXIE_SLA_PRIORITY)
targetQueue = &slaQueue;
else if (header.activityId & ROXIE_HIGH_PRIORITY)
targetQueue = &hiQueue;
else
targetQueue = &loQueue;

switch(header.activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: targetQueue = &slaQueue; break;
case ROXIE_HIGH_PRIORITY: targetQueue = &hiQueue; break;
case ROXIE_LOW_PRIORITY: targetQueue = &loQueue; break;
default: targetQueue = &bgQueue; break;
}
Owned<ISerializedRoxieQueryPacket> serialized = packet->serialize();
if (header.channel)
{
Expand Down
Loading
Loading