Skip to content

Commit

Permalink
Changes done for v2.3.5.
Browse files Browse the repository at this point in the history
  • Loading branch information
nysenthil committed May 16, 2022
1 parent b8fe2d0 commit a55787d
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 43 deletions.
4 changes: 4 additions & 0 deletions com.ibm.streamsx.sttgateway/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## v2.3.5
* May/16/2022
* Added code and logic necessary for the VgwDataRouter application to select a speech processor for handling a new voice call in a round robin fashion. This will allow for an even distribution of the voice calls across the configured number of speech processors.

## v2.3.4
* May/11/2022
* Added code and logic necessary to handle certain error situations where the SpeechProcessor will send much more than the required two transcriptionCompleted signals.
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.sttgateway/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

**Note:** This toolkit requires c++11 support.
</description>
<version>2.3.4</version>
<version>2.3.5</version>
<requiredProductVersion>4.2.1.6</requiredProductVersion>
</identity>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
==============================================
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2018, 2021
# Copyright IBM Corp. 2018, 2022
==============================================
*/

/*
==============================================
First created on: Nov/24/2020
Last modified on: May/10/2022
Last modified on: May/16/2022

A) What does this example application do?
--------------------------------------
Expand Down Expand Up @@ -547,6 +547,13 @@ public composite VgwDataRouter {
// given time by all the given speech processors that are configured to run.
mutable list<int32> _speechProcessorStatusList =
prepareIdleSpeechProcessorsList($totalNumberOfSpeechProcessorJobs);
// This variable tells us the speech processor id that was
// chosen to process a most recently arrived new voice call.
// This will help us in choosing the speech processors in a
// round robin fashion to evenly distribute the voice calls across
// all the speech processors thereby avoiding the overcrowding of
// just a few speech processors most of the time.
mutable int32 _recentlyChosenSpeechProcessorId = -1;
// This map tells us which speech processor is processing a given vgwSessionId.
// Key=vgwSessionId, Value=Speech Processor Id.
mutable map<rstring, int32> _vgwSessionIdToSpeechProcessorMap = {};
Expand Down Expand Up @@ -591,10 +598,12 @@ public composite VgwDataRouter {
// send the speech data belonging to this call.
// Store this VGW session id to change the status of this
// voice call from "brand new" to "ongoing".
int32 speechProcessorId =
speechProcessorId =
getSpeechProcessorIdForNewCallProcessing(
_numberOfConcurrentCallsAllowedPerSpeechProcessor,
_speechProcessorStatusList);
_speechProcessorStatusList,
$totalNumberOfSpeechProcessorJobs,
_recentlyChosenSpeechProcessorId);

if(speechProcessorId == -1) {
// This condition should not happen as long as there are enough
Expand All @@ -616,7 +625,12 @@ public composite VgwDataRouter {
// future use as the speech data keeps coming.
insertM(_vgwSessionIdToSpeechProcessorMap, BSD.vgwSessionId, speechProcessorId);
appTrc(Trace.error, "A new call with vgwSessionId=" + BSD.vgwSessionId +
" is being assigned to speech processor id " + (rstring)speechProcessorId);
" is being assigned to speech processor id " + (rstring)speechProcessorId);

// Let us store the currently chosen speech processor id in a
// state variable so that next time when a new call arrives we can
// choose the next speech processor to go in a round robin fashion.
_recentlyChosenSpeechProcessorId = speechProcessorId;
}

// We can prepare this speech data to be sent to the chosen speech processor id.
Expand Down Expand Up @@ -1385,33 +1399,72 @@ public list<int32> prepareIdleSpeechProcessorsList(int32 speechProcessorsCount)
// If all the speech processors are busy at this time, it will return -1.
public int32 getSpeechProcessorIdForNewCallProcessing(
int32 numberOfConcurrentCallsAllowedPerSpeechProcessor,
mutable list<int32> speechProcessorStatusList) {
mutable list<int32> speechProcessorStatusList,
int32 speechProcessorsCount,
int32 recentlyChosenSpeechProcessorId) {
// We can't do much with an empty list.
if (size(speechProcessorStatusList) <= 0) {
return(-1);
}

mutable int32 idx = -1;
// We are given a speech processor id that was earlier chosen for a most recently
// arrived new voice call. Instead of choosing the same speech processor id
// immediately for a newly arriving call right now, let us choose the next available
// speech processor. With this round robin approach, we can give a fair chance to
// every speech processor to process voice calls as they arrive. This will also
// help in spreading the voice call processing somewhat evenly across all the
// available speech processors thereby not overcrowding a few
// speech processors most of the time.
//
// Speech processor ids range from 1 to N. However, the SPL list passed to us above
// ranges from 0 to N-1. That means, the next speech processor status list index that
// we want to check is already being pointed to by the recently chosen
// speech processor id passed above as it is 1 based.
//
mutable boolean speechProcessorAvailable = false;
mutable int32 speechProcessorIdToBeChecked = recentlyChosenSpeechProcessorId;
mutable int32 loopCnt = 1;

for(int32 x in speechProcessorStatusList) {
idx++;

if(x < numberOfConcurrentCallsAllowedPerSpeechProcessor) {
// If the recently chosen speech processor id is the very last one,
// let us start from the beginning of the list. We will do the same if this is the
// very first time this function is getting called with the recently
// chosen speech processor id set to -1.
if(recentlyChosenSpeechProcessorId == -1 ||
recentlyChosenSpeechProcessorId == speechProcessorsCount) {
speechProcessorIdToBeChecked = 0;
}

// We can try to get one out of the configured number of speech processors as long as
// there exists a speech processor that is not fully booked at this time.
while(loopCnt++ <= speechProcessorsCount) {
// Can we choose this speech processor if it has
// any available slots to take a new voice call for processing?
if(speechProcessorStatusList[speechProcessorIdToBeChecked] <
numberOfConcurrentCallsAllowedPerSpeechProcessor) {
// This speech processor is not handling its max allowed concurrent calls.
// This speech processor id is available.
speechProcessorAvailable = true;
break;
}
} // End of for loop.

// If we are at the end of the list, let us wrap around to
// the beginning of the list now.
if(++speechProcessorIdToBeChecked >= speechProcessorsCount) {
speechProcessorIdToBeChecked = 0;
}
} // End of while loop.

if(speechProcessorAvailable == false) {
// All the speech processors are busy at this time.
// This is not good news for a newly arrived voice call.
return(-1);
}

// A newly arrived call's processing will be assigned to this speech processor.
// Let us increment the given speech processor's current call handling count.
speechProcessorStatusList[idx] = speechProcessorStatusList[idx] + 1;
// Since it is a zero based index array, we will return a value by adding 1 to it.
return(idx + 1);
} else {
// A newly arrived call's processing will be assigned to this speech processor.
// Let us increment the given speech processor's current call handling count.
speechProcessorStatusList[speechProcessorIdToBeChecked] =
speechProcessorStatusList[speechProcessorIdToBeChecked] + 1;
// Since it is a zero based index array, we will return a value by adding 1 to it.
// As we already mentioned above, we refer to the speech processor ids from 1 to N.
return(speechProcessorIdToBeChecked + 1);
}
}
4 changes: 2 additions & 2 deletions samples/VgwDataRouter/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
<info:identity>
<info:name>VgwDataRouter</info:name>
<info:description>Example that shows how to route VGW speech data to different Speech processor jobs</info:description>
<info:version>1.0.5</info:version>
<info:version>1.0.6</info:version>
<info:requiredProductVersion>4.2.1.6</info:requiredProductVersion>
</info:identity>
<info:dependencies>
<info:toolkit>
<common:name>com.ibm.streamsx.sttgateway</common:name>
<common:version>[2.3.3,7.0.0]</common:version>
<common:version>[2.3.5,7.0.0]</common:version>
</info:toolkit>
<info:toolkit>
<common:name>com.ibm.streamsx.websocket</common:name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
/*
==============================================
First created on: Nov/24/2020
Last modified on: May/10/2022
Last modified on: May/16/2022

A) What does this example application do?
--------------------------------------
Expand Down Expand Up @@ -496,6 +496,13 @@ public composite VgwDataRouter {
// given time by all the given speech processors that are configured to run.
mutable list<int32> _speechProcessorStatusList =
prepareIdleSpeechProcessorsList($totalNumberOfSpeechProcessorJobs);
// This variable tells us the speech processor id that was
// chosen to process a most recently arrived new voice call.
// This will help us in choosing the speech processors in a
// round robin fashion to evenly distribute the voice calls across
// all the speech processors thereby avoiding the overcrowding of
// just a few speech processors most of the time.
mutable int32 _recentlyChosenSpeechProcessorId = -1;
// This map tells us which speech processor is processing a given vgwSessionId.
// Key=vgwSessionId, Value=Speech Processor Id.
mutable map<rstring, int32> _vgwSessionIdToSpeechProcessorMap = {};
Expand Down Expand Up @@ -541,7 +548,9 @@ public composite VgwDataRouter {
speechProcessorId =
getSpeechProcessorIdForNewCallProcessing(
_numberOfConcurrentCallsAllowedPerSpeechProcessor,
_speechProcessorStatusList);
_speechProcessorStatusList,
$totalNumberOfSpeechProcessorJobs,
_recentlyChosenSpeechProcessorId);

if(speechProcessorId == -1) {
// This condition should not happen as long as there are enough
Expand All @@ -556,13 +565,17 @@ public composite VgwDataRouter {
" next time to handle your maximum expected concurrent calls.");
return;
}

// Let us store the speech processor id of this call for
// future use as the speech data keeps coming.
insertM(_vgwSessionIdToSpeechProcessorMap, BSD.vgwSessionId, speechProcessorId);
appTrc(Trace.error, "A new call with vgwSessionId=" + BSD.vgwSessionId +
" is being assigned to speech processor id " + (rstring)speechProcessorId);


// Let us store the currently chosen speech processor id in a
// state variable so that next time when a new call arrives we can
// choose the next speech processor to go in a round robin fashion.
_recentlyChosenSpeechProcessorId = speechProcessorId;
} else {
// This incoming tuple carries speech data belonging to a
// voice call that we have already started processing.
Expand Down Expand Up @@ -928,33 +941,72 @@ public list<int32> prepareIdleSpeechProcessorsList(int32 speechProcessorsCount)
// If all the speech processors are busy at this time, it will return -1.
public int32 getSpeechProcessorIdForNewCallProcessing(
int32 numberOfConcurrentCallsAllowedPerSpeechProcessor,
mutable list<int32> speechProcessorStatusList) {
mutable list<int32> speechProcessorStatusList,
int32 speechProcessorsCount,
int32 recentlyChosenSpeechProcessorId) {
// We can't do much with an empty list.
if (size(speechProcessorStatusList) <= 0) {
return(-1);
}

mutable int32 idx = -1;
// We are given a speech processor id that was earlier chosen for a most recently
// arrived new voice call. Instead of choosing the same speech processor id
// immediately for a newly arriving call right now, let us choose the next available
// speech processor. With this round robin approach, we can give a fair chance to
// every speech processor to process voice calls as they arrive. This will also
// help in spreading the voice call processing somewhat evenly across all the
// available speech processors thereby not overcrowding a few
// speech processors most of the time.
//
// Speech processor ids range from 1 to N. However, the SPL list passed to us above
// ranges from 0 to N-1. That means, the next speech processor status list index that
// we want to check is already being pointed to by the recently chosen
// speech processor id passed above as it is 1 based.
//
mutable boolean speechProcessorAvailable = false;
mutable int32 speechProcessorIdToBeChecked = recentlyChosenSpeechProcessorId;
mutable int32 loopCnt = 1;

for(int32 x in speechProcessorStatusList) {
idx++;

if(x < numberOfConcurrentCallsAllowedPerSpeechProcessor) {
// If the recently chosen speech processor id is the very last one,
// let us start from the beginning of the list. We will do the same if this is the
// very first time this function is getting called with the recently
// chosen speech processor id set to -1.
if(recentlyChosenSpeechProcessorId == -1 ||
recentlyChosenSpeechProcessorId == speechProcessorsCount) {
speechProcessorIdToBeChecked = 0;
}

// We can try to get one out of the configured number of speech processors as long as
// there exists a speech processor that is not fully booked at this time.
while(loopCnt++ <= speechProcessorsCount) {
// Can we choose this speech processor if it has
// any available slots to take a new voice call for processing?
if(speechProcessorStatusList[speechProcessorIdToBeChecked] <
numberOfConcurrentCallsAllowedPerSpeechProcessor) {
// This speech processor is not handling its max allowed concurrent calls.
// This speech processor id is available.
speechProcessorAvailable = true;
break;
}
} // End of for loop.

// If we are at the end of the list, let us wrap around to
// the beginning of the list now.
if(++speechProcessorIdToBeChecked >= speechProcessorsCount) {
speechProcessorIdToBeChecked = 0;
}
} // End of while loop.

if(speechProcessorAvailable == false) {
// All the speech processors are busy at this time.
// This is not good news for a newly arrived voice call.
return(-1);
}

// A newly arrived call's processing will be assigned to this speech processor.
// Let us increment the given speech processor's current call handling count.
speechProcessorStatusList[idx] = speechProcessorStatusList[idx] + 1;
// Since it is a zero based index array, we will return a value by adding 1 to it.
return(idx + 1);
} else {
// A newly arrived call's processing will be assigned to this speech processor.
// Let us increment the given speech processor's current call handling count.
speechProcessorStatusList[speechProcessorIdToBeChecked] =
speechProcessorStatusList[speechProcessorIdToBeChecked] + 1;
// Since it is a zero based index array, we will return a value by adding 1 to it.
// As we already mentioned above, we refer to the speech processor ids from 1 to N.
return(speechProcessorIdToBeChecked + 1);
}
}
4 changes: 2 additions & 2 deletions samples/VgwDataRouterMini/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
<info:identity>
<info:name>VgwDataRouterMini</info:name>
<info:description>Example that shows how to route VGW speech data to different Speech processor jobs</info:description>
<info:version>1.0.5</info:version>
<info:version>1.0.6</info:version>
<info:requiredProductVersion>4.2.1.6</info:requiredProductVersion>
</info:identity>
<info:dependencies>
<info:toolkit>
<common:name>com.ibm.streamsx.sttgateway</common:name>
<common:version>[2.3.3,7.0.0]</common:version>
<common:version>[2.3.5,7.0.0]</common:version>
</info:toolkit>
<info:toolkit>
<common:name>com.ibm.streamsx.websocket</common:name>
Expand Down
2 changes: 1 addition & 1 deletion sttgateway-tech-brief.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
============================================================
First created on: July/01/2018
Last modified on: May/11/2022
Last modified on: May/16/2022

Purpose of this toolkit
-----------------------
Expand Down

0 comments on commit a55787d

Please sign in to comment.