Skip to content

Commit

Permalink
[UPDATED] JetStream: Handle Tier limits in jsAccountInfo
Browse files Browse the repository at this point in the history
Related to nats-io/nats-architecture-and-design#120

Signed-off-by: Ivan Kozlovic <[email protected]>
  • Loading branch information
kozlovic committed Sep 19, 2022
1 parent ab51e4d commit 0211381
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 12 deletions.
92 changes: 83 additions & 9 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1718,6 +1718,75 @@ js_EraseMsg(jsCtx *js, const char *stream, uint64_t seq, jsOptions *opts, jsErrC
// Account related functions
//

static natsStatus
_unmarshalAccLimits(nats_JSON *json, jsAccountLimits *limits)
{
natsStatus s;
nats_JSON *obj = NULL;

s = nats_JSONGetObject(json, "limits", &obj);
if (obj == NULL)
return NATS_UPDATE_ERR_STACK(s);

IFOK(s, nats_JSONGetLong(obj, "max_memory", &(limits->MaxMemory)));
IFOK(s, nats_JSONGetLong(obj, "max_storage", &(limits->MaxStore)));
IFOK(s, nats_JSONGetLong(obj, "max_streams", &(limits->MaxStreams)));
IFOK(s, nats_JSONGetLong(obj, "max_consumers", &(limits->MaxConsumers)));
IFOK(s, nats_JSONGetLong(obj, "max_ack_pending", &(limits->MaxAckPending)));
IFOK(s, nats_JSONGetLong(obj, "memory_max_stream_bytes", &(limits->MemoryMaxStreamBytes)));
IFOK(s, nats_JSONGetLong(obj, "storage_max_stream_bytes", &(limits->StoreMaxStreamBytes)));
IFOK(s, nats_JSONGetBool(obj, "max_bytes_required", &(limits->MaxBytesRequired)));

return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_fillTier(void *userInfo, const char *subject, nats_JSONField *f)
{
jsAccountInfo *ai = (jsAccountInfo*) userInfo;
natsStatus s = NATS_OK;
jsTier *t = NULL;
nats_JSON *json = f->value.vobj;

t = (jsTier*) NATS_CALLOC(1, sizeof(jsTier));
if (t == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

ai->Tiers[ai->TiersLen++] = t;

DUP_STRING(s, t->Name, subject);
IFOK(s, nats_JSONGetULong(json, "memory", &(t->Memory)));
IFOK(s, nats_JSONGetULong(json, "storage", &(t->Store)));
IFOK(s, nats_JSONGetLong(json, "streams", &(t->Streams)));
IFOK(s, nats_JSONGetLong(json, "consumers", &(t->Consumers)));
IFOK(s, _unmarshalAccLimits(json, &(t->Limits)));

return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalAccTiers(nats_JSON *json, jsAccountInfo *ai)
{
nats_JSON *obj = NULL;
natsStatus s = NATS_OK;
int n;

s = nats_JSONGetObject(json, "tier", &obj);
if (obj == NULL)
return NATS_UPDATE_ERR_STACK(s);

n = natsStrHash_Count(obj->fields);
if (n == 0)
return NATS_OK;

ai->Tiers = (jsTier**) NATS_CALLOC(n, sizeof(jsTier*));
if (ai->Tiers == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

s = nats_JSONRange(obj, TYPE_OBJECT, 0, _fillTier, (void*) ai);
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_unmarshalAccountInfo(nats_JSON *json, jsAccountInfo **new_ai)
{
Expand All @@ -1741,15 +1810,8 @@ js_unmarshalAccountInfo(nats_JSON *json, jsAccountInfo **new_ai)
IFOK(s, nats_JSONGetULong(obj, "errors", &(ai->API.Errors)));
obj = NULL;
}
IFOK(s, nats_JSONGetObject(json, "limits", &obj));
if ((s == NATS_OK) && (obj != NULL))
{
IFOK(s, nats_JSONGetLong(obj, "max_memory", &(ai->Limits.MaxMemory)));
IFOK(s, nats_JSONGetLong(obj, "max_storage", &(ai->Limits.MaxStore)));
IFOK(s, nats_JSONGetLong(obj, "max_streams", &(ai->Limits.MaxStreams)));
IFOK(s, nats_JSONGetLong(obj, "max_consumers", &(ai->Limits.MaxConsumers)));
obj = NULL;
}
IFOK(s, _unmarshalAccLimits(json, &(ai->Limits)));
IFOK(s, _unmarshalAccTiers(json, ai));

if (s == NATS_OK)
*new_ai = ai;
Expand Down Expand Up @@ -1827,6 +1889,18 @@ jsAccountInfo_Destroy(jsAccountInfo *ai)
if (ai == NULL)
return;

if (ai->Tiers != NULL)
{
int i;
for (i=0; i<ai->TiersLen; i++)
{
jsTier *t = ai->Tiers[i];

NATS_FREE((char*) t->Name);
NATS_FREE(t);
}
NATS_FREE(ai->Tiers);
}
NATS_FREE(ai->Domain);
NATS_FREE(ai);
}
Expand Down
17 changes: 17 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -889,9 +889,24 @@ typedef struct jsAccountLimits
int64_t MaxStore;
int64_t MaxStreams;
int64_t MaxConsumers;
int64_t MaxAckPending;
int64_t MemoryMaxStreamBytes;
int64_t StoreMaxStreamBytes;
bool MaxBytesRequired;

} jsAccountLimits;

typedef struct jsTier
{
const char *Name;
uint64_t Memory;
uint64_t Store;
int64_t Streams;
int64_t Consumers;
jsAccountLimits Limits;

} jsTier;

/**
* Information about the JetStream usage from the current account.
*/
Expand All @@ -904,6 +919,8 @@ typedef struct jsAccountInfo
char *Domain;
jsAPIStats API;
jsAccountLimits Limits;
jsTier **Tiers;
int TiersLen;

} jsAccountInfo;

Expand Down
71 changes: 68 additions & 3 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -21270,6 +21270,7 @@ test_JetStreamUnmarshalAccountInfo(void)
natsStatus s;
nats_JSON *json = NULL;
jsAccountInfo *ai = NULL;
jsTier *r = NULL;
const char *bad[] = {
"{\"memory\":\"abc\"}",
"{\"storage\":\"abc\"}",
Expand All @@ -21284,8 +21285,16 @@ test_JetStreamUnmarshalAccountInfo(void)
"{\"limits\":{\"max_storage\":\"abc\"}}",
"{\"limits\":{\"max_streams\":\"abc\"}}",
"{\"limits\":{\"max_consumers\":\"abc\"}}",
"{\"limits\":{\"max_ack_pending\":\"abc\"}}",
"{\"limits\":{\"memory_max_stream_bytes\":\"abc\"}}",
"{\"limits\":{\"storage_max_stream_bytes\":\"abc\"}}",
"{\"limits\":{\"max_bytes_required\":\"abc\"}}",
"{\"tier\":123}",
"{\"tier\":{1, 2}}",
"{\"tier\":{\"R1\":123}}",
"{\"tier\":{\"R1\":{\"memory\":\"abc\"}}}",
};
char tmp[512];
char tmp[2048];
int i;

for (i=0; i<(int)(sizeof(bad)/sizeof(char*)); i++)
Expand All @@ -21303,7 +21312,14 @@ test_JetStreamUnmarshalAccountInfo(void)
snprintf(tmp, sizeof(tmp), "{\"memory\":1000,\"storage\":2000,\"streams\":5,\"consumers\":7,"\
"\"domain\":\"MyDomain\","\
"\"api\":{\"total\":8,\"errors\":2},"\
"\"limits\":{\"max_memory\":3000,\"max_storage\":4000,\"max_streams\":10,\"max_consumers\":20}}");
"\"limits\":{\"max_memory\":3000,\"max_storage\":4000,\"max_streams\":10,\"max_consumers\":20,"\
"\"max_ack_pending\":100,\"memory_max_stream_bytes\":1000000,\"storage_max_stream_bytes\":2000000,\"max_bytes_required\":true},"\
"\"tier\":{\"R1\":{\"memory\":1000,\"storage\":2000,\"streams\":5,\"consumers\":7,"\
"\"limits\":{\"max_memory\":3000,\"max_storage\":4000,\"max_streams\":10,\"max_consumers\":20,"\
"\"max_ack_pending\":100,\"memory_max_stream_bytes\":1000000,\"storage_max_stream_bytes\":2000000,\"max_bytes_required\":true}},"\
"\"R2\":{\"memory\":2000,\"storage\":3000,\"streams\":8,\"consumers\":9,"\
"\"limits\":{\"max_memory\":4000,\"max_storage\":5000,\"max_streams\":20,\"max_consumers\":30,"\
"\"max_ack_pending\":200,\"memory_max_stream_bytes\":2000000,\"storage_max_stream_bytes\":3000000}}}}");
s = nats_JSONParse(&json, tmp, (int) strlen(tmp));
IFOK(s, js_unmarshalAccountInfo(json, &ai));
testCond((s == NATS_OK) && (ai != NULL)
Expand All @@ -21318,7 +21334,56 @@ test_JetStreamUnmarshalAccountInfo(void)
&& (ai->Limits.MaxMemory == 3000)
&& (ai->Limits.MaxStore == 4000)
&& (ai->Limits.MaxStreams == 10)
&& (ai->Limits.MaxConsumers == 20));
&& (ai->Limits.MaxConsumers == 20)
&& (ai->Limits.MaxAckPending == 100)
&& (ai->Limits.MemoryMaxStreamBytes == 1000000)
&& (ai->Limits.StoreMaxStreamBytes == 2000000)
&& (ai->Limits.MaxBytesRequired)
&& (ai->Tiers != NULL)
&& (ai->TiersLen == 2));

test("Check tier R1: ");
if (strcmp(ai->Tiers[0]->Name, "R1") == 0)
r = ai->Tiers[0];
else if (strcmp(ai->Tiers[1]->Name, "R1") == 0)
r = ai->Tiers[1];
else
s = NATS_ERR;
testCond((s == NATS_OK)
&& (r->Memory == 1000)
&& (r->Store == 2000)
&& (r->Streams == 5)
&& (r->Consumers == 7)
&& (r->Limits.MaxMemory == 3000)
&& (r->Limits.MaxStore == 4000)
&& (r->Limits.MaxStreams == 10)
&& (r->Limits.MaxConsumers == 20)
&& (r->Limits.MaxAckPending == 100)
&& (r->Limits.MemoryMaxStreamBytes == 1000000)
&& (r->Limits.StoreMaxStreamBytes == 2000000)
&& r->Limits.MaxBytesRequired);

test("Check tier R2: ");
if (strcmp(ai->Tiers[0]->Name, "R2") == 0)
r = ai->Tiers[0];
else if (strcmp(ai->Tiers[1]->Name, "R2") == 0)
r = ai->Tiers[1];
else
s = NATS_ERR;
testCond((s == NATS_OK)
&& (r->Memory == 2000)
&& (r->Store == 3000)
&& (r->Streams == 8)
&& (r->Consumers == 9)
&& (r->Limits.MaxMemory == 4000)
&& (r->Limits.MaxStore == 5000)
&& (r->Limits.MaxStreams == 20)
&& (r->Limits.MaxConsumers == 30)
&& (r->Limits.MaxAckPending == 200)
&& (r->Limits.MemoryMaxStreamBytes == 2000000)
&& (r->Limits.StoreMaxStreamBytes == 3000000)
&& (!r->Limits.MaxBytesRequired));

nats_JSONDestroy(json);
jsAccountInfo_Destroy(ai);
}
Expand Down

0 comments on commit 0211381

Please sign in to comment.