Skip to content

Commit

Permalink
MOSIP-32454: Added additional logs to debug (#257)
Browse files Browse the repository at this point in the history
Signed-off-by: HimajaDhanyamraju2 <[email protected]>
Co-authored-by: HimajaDhanyamraju2 <[email protected]>
  • Loading branch information
HimajaDhanyamraju2 and HimajaDhanyamraju2 authored May 7, 2024
1 parent 6cf233e commit 8837edd
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions hub/start_hub.bal
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,13 @@ public function main() returns error? {
_ = @strand {thread: "any"} start syncSubscribersCache();

// Start the Hub
log:printInfo("Hub initialization done and starting the hub...");

http:Listener httpListener = check new (config:HUB_PORT);
check httpListener.attach(healthCheckService, "hub/actuator/health");
websubhub:Listener hubListener = check new (httpListener);
check hubListener.attach(hubService, "hub");
log:printInfo("Starting the hub...");
check hubListener.'start();
}

Expand All @@ -70,10 +73,12 @@ function validateConfigs() returns boolean|error {
}
return error("Found error in decoding the encryption key. Please set valid base64 encoded bytes as encryption key to proceed.");
}
log:printInfo("Configs validated and returning true...");
return true;
}

function syncRegsisteredTopicsCache() {
log:printInfo("syncRegisteredTopicsCache started...");
do {
while true {
websubhub:TopicRegistration[]? persistedTopics = check getPersistedTopics();
Expand All @@ -92,6 +97,7 @@ function syncRegsisteredTopicsCache() {
}

function getPersistedTopics() returns websubhub:TopicRegistration[]|error? {
log:printInfo("Entered getPersistedTopics method...");
kafka:ConsumerRecord[] records = check conn:registeredTopicsConsumer->poll(config:POLLING_INTERVAL);
if records.length() > 0 {
kafka:ConsumerRecord lastRecord = records.pop();
Expand All @@ -107,6 +113,7 @@ function getPersistedTopics() returns websubhub:TopicRegistration[]|error? {
}

function deSerializeTopicsMessage(string lastPersistedData) returns websubhub:TopicRegistration[]|error {
log:printInfo("Entered deSerializeTopicsMessage method...");
websubhub:TopicRegistration[] currentTopics = [];
json[] payload = <json[]>check value:fromJsonString(lastPersistedData);
foreach var data in payload {
Expand All @@ -117,6 +124,7 @@ function deSerializeTopicsMessage(string lastPersistedData) returns websubhub:To
}

function refreshTopicCache(websubhub:TopicRegistration[] persistedTopics) {
log:printInfo("Entered refreshTopicCache method...");
lock {
registeredTopicsCache.removeAll();
}
Expand All @@ -129,6 +137,7 @@ function refreshTopicCache(websubhub:TopicRegistration[] persistedTopics) {
}

function syncSubscribersCache() {
log:printInfo("Entered syncSubscribersCache method...");
do {
while true {
websubhub:VerifiedSubscription[]? persistedSubscribers = check getPersistedSubscribers();
Expand All @@ -148,6 +157,7 @@ function syncSubscribersCache() {
}

function getPersistedSubscribers() returns websubhub:VerifiedSubscription[]|error? {
log:printInfo("Entered getPersistedSubscribers method...");
kafka:ConsumerRecord[] records = check conn:subscribersConsumer->poll(config:POLLING_INTERVAL);
if records.length() > 0 {
kafka:ConsumerRecord lastRecord = records.pop();
Expand All @@ -163,6 +173,7 @@ function getPersistedSubscribers() returns websubhub:VerifiedSubscription[]|erro
}

function deSerializeSubscribersMessage(string lastPersistedData) returns websubhub:VerifiedSubscription[]|error {
log:printInfo("Entered deSerializeSubscribersMessage method...");
websubhub:VerifiedSubscription[] currentSubscriptions = [];
json[] payload = <json[]>check value:fromJsonString(lastPersistedData);
foreach var data in payload {
Expand All @@ -173,6 +184,7 @@ function deSerializeSubscribersMessage(string lastPersistedData) returns websubh
}

function refreshSubscribersCache(websubhub:VerifiedSubscription[] persistedSubscribers) {
log:printInfo("Entered refreshSubscribersCache method...");
final readonly & string[] subscriberIds = persistedSubscribers.'map(sub => util:generateSubscriberId(sub.hubTopic, sub.hubCallback)).cloneReadOnly();
lock {
string[] unsubscribedSubscribers = subscribersCache.keys().filter('key => subscriberIds.indexOf('key) is ());
Expand All @@ -183,6 +195,7 @@ function refreshSubscribersCache(websubhub:VerifiedSubscription[] persistedSubsc
}

function startMissingSubscribers(websubhub:VerifiedSubscription[] persistedSubscribers) returns error? {
log:printInfo("Entered startMissingSubscribers method...");
foreach var subscriber in persistedSubscribers {
string topicName = util:sanitizeTopicName(subscriber.hubTopic);
string subscriberId = util:generateSubscriberId(subscriber.hubTopic, subscriber.hubCallback);
Expand Down

0 comments on commit 8837edd

Please sign in to comment.