Skip to content

Commit

Permalink
Changes done for v2.2.9.
Browse files Browse the repository at this point in the history
  • Loading branch information
nysenthil committed Feb 24, 2021
1 parent 1566763 commit 863e59e
Show file tree
Hide file tree
Showing 21 changed files with 1,472 additions and 1,391 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ st submitjob -d <YOUR_STREAMS_DOMAIN> -i <YOUR_STREAMS_INSTANCE> output/co
If you are planning to ingest the speech data from live voice calls, then you can invoke the **IBMVoiceGatewaySource** operator as shown below.

```
(stream<BinarySpeech_t> BinarySpeechData as BSD;
stream<EndOfCallSignal_t> EndOfCallSignal as EOCS) as VoiceGatewayInferface =
(stream<BinarySpeech_t> BinarySpeechData as BSD) as VoiceGatewayInferface =
IBMVoiceGatewaySource() {
logic
state: {
Expand Down
5 changes: 5 additions & 0 deletions com.ibm.streamsx.sttgateway/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changes

## v2.2.9
* Feb/11/2021
* Removed the EndOfCallSignal (EOCS) output stream completely to avoid port locks and out of order processing between the binary speech data (BSD) and the EOCS tuples. Now, a single output stream will deliver both the BSD and EOCS tuples in the correct sequence for downstream processing.
* The change described above triggered foundational changes in the IBMVoiceGatewaySource operator and in the examples that invoke that operator.

## v2.2.8
* Feb/07/2021
* Modified the IBMVoiceGatewaySource operator to handle the exception thrown when a given websocket connection handle can't be found in the connection metadata map.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,25 @@
information and assign that meta data values to other optional attributes in this
output port.

In addition to sending the binary speech data on this port, this operator will
also send End Of Call Signal (EOCS) on this port whenever a particular
voice channel of an ongoing call closes its WebSocket connection. So, this operator
produces periodic output tuples to give an indication about the end of a
specific speaker (i.e. channel) in a voice call that was in progress moments ago for
the given IBM Voice Gateway session id. When it sends EOCS, it only sets values to
certain attributes of the output stream as shown here.
rstring vgwSessionId, boolean isCustomerSpeechData, int32 vgwVoiceChannelNumber, boolean endOfCallSignal
This source operator will set the appropriate values for these attributes to
indicate which particular speaker (i.e. voice channel number) of a given voice call
(i.e. session id) just ended the conversation. This tuple also has an attribute
(i.e. isCustomerSpeechData) to tell whether that recently ended voice channel
carried the speech data of a customer or an agent. More importantly, it will set
a value of true for the endOfCallSignal attribute to indicate that it is an EOCS message and not a
binary speech message. It was decided to use the same output port to send both of these
messages in order to avoid any port locks and/or tuple ordering issues that may happen if we choose to
do it using two different output ports. Downstream operators can make use of this
"End Of Voice Call" signal as they see fit.

**There are multiple available output functions**, and output attributes can also be
assigned values with any SPL expression that evaluates to the proper type.
</description>
Expand All @@ -319,31 +338,7 @@
<tupleMutationAllowed>false</tupleMutationAllowed>
<cardinality>1</cardinality>
<optional>false</optional>
</outputPortSet>

<outputPortSet>
<description>
This port produces periodic output tuples to give an indication about the end of a
specific speaker (i.e. channel) in a voice call that was in progress moments ago for
the given IBM Voice Gateway session id. The schema for this port must have these
three attributes with their correct data types as shown here.
rstring vgwSessionId, boolean isCustomerSpeechData, int32 vgwVoiceChannelNumber
This source operator will set the appropriate values for these attributes to
indicate which particular speaker (i.e. voice channel number) of a given voice call
(i.e. session id) just ended the conversation. This tuple also has an attribute
(i.e. isCustomerSpeechData) to tell whether that recently ended voice channel
carried the speech data of a customer or an agent. Downstream operators can make
use of this "End Of Voice Call" signal as they see fit.
</description>
<expressionMode>Expression</expressionMode>
<autoAssignment>false</autoAssignment>
<completeAssignment>false</completeAssignment>
<rewriteAllowed>true</rewriteAllowed>
<windowPunctuationOutputMode>Free</windowPunctuationOutputMode>
<tupleMutationAllowed>false</tupleMutationAllowed>
<cardinality>1</cardinality>
<optional>false</optional>
</outputPortSet>
</outputPortSet>
</outputPorts>
</cppOperatorModel>
</operatorModel>
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
/*
============================================================
First created on: Sep/20/2019
Last modified on: Feb/07/2021
Last modified on: Feb/09/2021
Please refer to the sttgateway-tech-brief.txt file in the
top-level directory of this toolkit to read about
Expand Down Expand Up @@ -87,6 +87,15 @@ using websocketpp::lib::bind;
my $audioOutputAsBlob = undef;
my $outputAttrs1 = $outputPort1->getAttributes();
my $speechAttributeFound = 0;
my $vgwSessionIdAsString = undef;
my $vgwSessionIdAttributeFound = 0;
my $isCustomerSpeechDataAsBoolean = undef;
my $isCustomerSpeechDataAttributeFound = 0;
my $vgwVoiceChannelNumberAsInt32 = undef;
my $vgwVoiceChannelNumberAttributeFound = 0;
my $endOfCallSignalAsBoolean = undef;
my $endOfCallSignalAttributeFound = 0;


foreach my $outputAttr (@$outputAttrs1) {
my $outAttrName = $outputAttr->getName();
Expand All @@ -100,64 +109,59 @@ using websocketpp::lib::bind;
$audioOutputAsBlob = 1;
}
}
}

if ($speechAttributeFound == 0 ) {
SPL::CodeGen::exitln(SttGatewayResource::STTGW_OUT_ATTRIBUTE_CHECK1("IBMVoiceGatewaySource", "speech"),
$model->getContext()->getSourceLocation());
}

if (!(defined($audioOutputAsBlob))) {
SPL::CodeGen::exitln(SttGatewayResource::STTGW_OUT_ATTRIBUTE_TYPE_CHECK1("IBMVoiceGatewaySource", "speech", "blob"),
$model->getContext()->getSourceLocation());
}

# Check the output port number 1 i.e. the second output port.
my $outputPort2 = $model->getOutputPortAt(1);
my $outputTupleName2 = $outputPort2->getCppTupleName();
my $vgwSessionIdAsString = undef;
my $outputAttrs2 = $outputPort2->getAttributes();
my $vgwSessionIdAttributeFound = 0;
my $isCustomerSpeechDataAsBoolean = undef;
my $isCustomerSpeechDataAttributeFound = 0;
my $vgwVoiceChannelNumberAsInt32 = undef;
my $vgwVoiceChannelNumberAttributeFound = 0;

foreach my $outputAttr2 (@$outputAttrs2) {
my $outAttrName2 = $outputAttr2->getName();
my $outAttrType2 = $outputAttr2->getSPLType();

if ($outAttrName2 eq "vgwSessionId") {
if ($outAttrName eq "vgwSessionId") {
$vgwSessionIdAttributeFound = 1;

if ($outAttrType2 eq "rstring") {
if ($outAttrType eq "rstring") {
# This tuple attribute will carry the Voice Gateway Session Id.
$vgwSessionIdAsString = 1;
}
}

if ($outAttrName2 eq "isCustomerSpeechData") {
if ($outAttrName eq "isCustomerSpeechData") {
$isCustomerSpeechDataAttributeFound = 1;

if ($outAttrType2 eq "boolean") {
if ($outAttrType eq "boolean") {
# This tuple attribute will indicate whether the
# given channel of a given voice call carried the
# speech data of a customer or an agent.
$isCustomerSpeechDataAsBoolean = 1;
}
}

if ($outAttrName2 eq "vgwVoiceChannelNumber") {
if ($outAttrName eq "vgwVoiceChannelNumber") {
$vgwVoiceChannelNumberAttributeFound = 1;

if ($outAttrType2 eq "int32") {
if ($outAttrType eq "int32") {
# This tuple attribute will indicate the
# channel number of given voice call.
$vgwVoiceChannelNumberAsInt32 = 1;
}
}

if ($outAttrName eq "endOfCallSignal") {
$endOfCallSignalAttributeFound = 1;

if ($outAttrType eq "boolean") {
# This tuple attribute will indicate whether the
# given channel of a given voice call has ended
# sending speech data by closing its WebSocket connection.
$endOfCallSignalAsBoolean = 1;
}
}
}

if ($speechAttributeFound == 0 ) {
SPL::CodeGen::exitln(SttGatewayResource::STTGW_OUT_ATTRIBUTE_CHECK1("IBMVoiceGatewaySource", "speech"),
$model->getContext()->getSourceLocation());
}

if (!(defined($audioOutputAsBlob))) {
SPL::CodeGen::exitln(SttGatewayResource::STTGW_OUT_ATTRIBUTE_TYPE_CHECK1("IBMVoiceGatewaySource", "speech", "blob"),
$model->getContext()->getSourceLocation());
}

if ($vgwSessionIdAttributeFound == 0 ) {
SPL::CodeGen::exitln(SttGatewayResource::STTGW_OUT_ATTRIBUTE_CHECK2("IBMVoiceGatewaySource", "vgwSessionId"),
$model->getContext()->getSourceLocation());
Expand Down Expand Up @@ -187,6 +191,16 @@ using websocketpp::lib::bind;
SPL::CodeGen::exitln(SttGatewayResource::STTGW_OUT_ATTRIBUTE_TYPE_CHECK2("IBMVoiceGatewaySource", "vgwVoiceChannelNumber", "int32"),
$model->getContext()->getSourceLocation());
}

if ($endOfCallSignalAttributeFound == 0 ) {
SPL::CodeGen::exitln(SttGatewayResource::STTGW_OUT_ATTRIBUTE_CHECK2("IBMVoiceGatewaySource", "endOfCallSignal"),
$model->getContext()->getSourceLocation());
}

if (!(defined($endOfCallSignalAsBoolean))) {
SPL::CodeGen::exitln(SttGatewayResource::STTGW_OUT_ATTRIBUTE_TYPE_CHECK2("IBMVoiceGatewaySource", "endOfCallSignal", "boolean"),
$model->getContext()->getSourceLocation());
}

# Following are the operator parameters.
my $tlsPort = $model->getParameterByName("tlsPort");
Expand Down Expand Up @@ -1035,11 +1049,12 @@ void MY_OPERATOR::on_message(EndpointType* s, websocketpp::connection_hdl hdl,
if (vgwSessionIdFoundInMap == true) {
// Send the "End of Voice Call" signal now for this
// vgwSessionId_vgwVoiceChannelNumber combo.
OPort1Type oTuple;
OPort0Type oTuple;
oTuple.set_vgwSessionId(con_metadata.vgwSessionId);
oTuple.set_isCustomerSpeechData(con_metadata.vgwIsCaller);
oTuple.set_vgwVoiceChannelNumber(con_metadata.vgwVoiceChannelNumber);
submit(oTuple, 1);
oTuple.set_endOfCallSignal(true);
submit(oTuple, 0);

if (vgwSessionLoggingNeeded == true) {
SPLAPPTRC(L_ERROR, "Operator " << operatorPhysicalName <<
Expand Down Expand Up @@ -1135,17 +1150,19 @@ void MY_OPERATOR::on_message(EndpointType* s, websocketpp::connection_hdl hdl,
// vgwSessionId_vgwVoiceChannelNumber combo.
// Send it for voice channel 1 which is an
// agent channel most of the time.
OPort1Type oTuple;
OPort0Type oTuple;
oTuple.set_vgwSessionId(*it);
oTuple.set_isCustomerSpeechData(false);
oTuple.set_vgwVoiceChannelNumber(1);
submit(oTuple, 1);
oTuple.set_endOfCallSignal(true);
submit(oTuple, 0);
// Do the same for voice channel 2 which is a
// customer channel most of the time.
oTuple.set_vgwSessionId(*it);
oTuple.set_isCustomerSpeechData(true);
oTuple.set_vgwVoiceChannelNumber(2);
submit(oTuple, 1);
oTuple.set_endOfCallSignal(true);
submit(oTuple, 0);

// We have a map where the agent and caller phone numbers of a given
// call session id are stored. Since this call has gone stale,
Expand Down Expand Up @@ -1205,11 +1222,12 @@ void MY_OPERATOR::on_message(EndpointType* s, websocketpp::connection_hdl hdl,
// do its own clean-up and release of the STT engines.
// Send the "End of Voice Call" signal now for this
// vgwSessionId_vgwVoiceChannelNumber combo.
OPort1Type oTuple;
OPort0Type oTuple;
oTuple.set_vgwSessionId(cmd.vgwSessionId);
oTuple.set_isCustomerSpeechData(cmd.vgwIsCaller);
oTuple.set_vgwVoiceChannelNumber(cmd.vgwVoiceChannelNumber);
submit(oTuple, 1);
oTuple.set_endOfCallSignal(true);
submit(oTuple, 0);

// Added this logic on Sep/04/2020.
// We have a map where the agent and caller phone numbers of a given
Expand Down Expand Up @@ -1368,6 +1386,7 @@ void MY_OPERATOR::on_message(EndpointType* s, websocketpp::connection_hdl hdl,
speechBlob.setData((unsigned char*)payloadBuffer, (uint64_t)payloadSize);
OPort0Type oTuple;
oTuple.set_speech(speechBlob);
oTuple.set_endOfCallSignal(false);

// Now let us set any attributes that the caller of this operator is trying to
// assign through this operator's output functions.
Expand Down Expand Up @@ -1565,11 +1584,12 @@ void MY_OPERATOR::on_close(websocketpp::connection_hdl hdl) {
if (vgwSessionIdFoundInMap == true && con_metadata.vgwVoiceChannelNumber > 0) {
// Send the "End of Voice Call" signal now for this
// vgwSessionId_vgwVoiceChannelNumber combo.
OPort1Type oTuple;
OPort0Type oTuple;
oTuple.set_vgwSessionId(con_metadata.vgwSessionId);
oTuple.set_isCustomerSpeechData(con_metadata.vgwIsCaller);
oTuple.set_vgwVoiceChannelNumber(con_metadata.vgwVoiceChannelNumber);
submit(oTuple, 1);
oTuple.set_endOfCallSignal(true);
submit(oTuple, 0);

if (vgwSessionLoggingNeeded == true) {
SPLAPPTRC(L_ERROR, "Operator " << operatorPhysicalName <<
Expand Down Expand Up @@ -1688,17 +1708,19 @@ void MY_OPERATOR::on_close(websocketpp::connection_hdl hdl) {
// vgwSessionId_vgwVoiceChannelNumber combo.
// Send it for voice channel 1 which is an
// agent channel most of the time.
OPort1Type oTuple;
OPort0Type oTuple;
oTuple.set_vgwSessionId(*it);
oTuple.set_isCustomerSpeechData(false);
oTuple.set_vgwVoiceChannelNumber(1);
submit(oTuple, 1);
oTuple.set_endOfCallSignal(true);
submit(oTuple, 0);
// Do the same for voice channel 2 which is a
// customer channel most of the time.
oTuple.set_vgwSessionId(*it);
oTuple.set_isCustomerSpeechData(true);
oTuple.set_vgwVoiceChannelNumber(2);
submit(oTuple, 1);
oTuple.set_endOfCallSignal(true);
submit(oTuple, 0);

// We have a map where the agent and caller phone numbers of a given
// call session id are stored. Since this call has gone stale,
Expand Down Expand Up @@ -1758,11 +1780,12 @@ void MY_OPERATOR::on_close(websocketpp::connection_hdl hdl) {
// do its own clean-up and release of the STT engines.
// Send the "End of Voice Call" signal now for this
// vgwSessionId_vgwVoiceChannelNumber combo.
OPort1Type oTuple;
OPort0Type oTuple;
oTuple.set_vgwSessionId(cmd.vgwSessionId);
oTuple.set_isCustomerSpeechData(cmd.vgwIsCaller);
oTuple.set_vgwVoiceChannelNumber(cmd.vgwVoiceChannelNumber);
submit(oTuple, 1);
oTuple.set_endOfCallSignal(true);
submit(oTuple, 0);

// Added this logic on Sep/04/2020.
// We have a map where the agent and caller phone numbers of a given
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.sttgateway/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

**Note:** This toolkit requires c++11 support.
</description>
<version>2.2.8</version>
<version>2.2.9</version>
<requiredProductVersion>4.2.1.6</requiredProductVersion>
</identity>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
==============================================
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2018, 2020
# Copyright IBM Corp. 2018, 2021
==============================================
*/

/*
==============================================
First created on: Nov/24/2020
Last modified on: Nov/26/2020
Last modified on: Feb/09/2021

This is a utility composite that will get used in the following applications.

Expand All @@ -25,7 +25,7 @@ namespace com.ibm.streamsx.sttgateway.utils;
// Code for the C++ native functions can be found in the impl/include directory of this project.
//
// The following is the schema of the first output stream for the
// IBMVoiceGatewaySource operator. The first four attributes are
// IBMVoiceGatewaySource operator. The first five attributes are
// very important and the other ones are purely optional if some
// scenarios really require them.
// blob speech --> Speech fragments of a live conversation as captured and sent by the IBM Voice Gateway.
Expand All @@ -36,6 +36,9 @@ namespace com.ibm.streamsx.sttgateway.utils;
// Whoever (caller or agent) sends the first round of
// speech data bytes will get assigned a voice channel of 1.
// The next one to follow will get assigned a voice channel of 2.
// boolean endOfCallSignal --> This attribute will be set to true by the IBMVoiceGatewaySource
// operator when it sends an EOCS for a voice channel. It will be
// set to false by that operator when it sends binary speech data.
// rstring id --> This attribute is needed by the WatsonS2T operator.
// It is set to vgwSessionId_vgwVoiceChannelNumber
// rstring callStartDateTime --> Call start date time i.e. system clock time.
Expand All @@ -47,18 +50,11 @@ namespace com.ibm.streamsx.sttgateway.utils;
// int32 speechEngineId --> This attribute will be set in the speech processor job. (Please, read the comments there.)
// int32 speechResultProcessorId --> This attribute will be set in the speech processor job. (Please, read the comments there.)
type BinarySpeech_t = blob speech, rstring vgwSessionId, boolean isCustomerSpeechData,
int32 vgwVoiceChannelNumber, rstring id, rstring callStartDateTime,
int32 vgwVoiceChannelNumber, boolean endOfCallSignal,
rstring id, rstring callStartDateTime,
rstring callerPhoneNumber, rstring agentPhoneNumber,
int32 speechDataFragmentCnt, int32 totalSpeechDataBytesReceived,
int32 speechProcessorId, int32 speechEngineId, int32 speechResultProcessorId;
// The following schema is for the second output stream of the
// IBMVoiceGatewaySource operator. It has three attributes indicating
// the speaker channel (vgwVoiceChannelNumber) of a given voice call (vgwSessionId) who
// got completed with the call as well as an indicator (isCustomerSpeechData) to
// denote whether the speech data we received on this channel belonged
// to a caller or an agent.
type EndOfCallSignal_t = rstring vgwSessionId,
boolean isCustomerSpeechData, int32 vgwVoiceChannelNumber;

// The following schema will be for the data being sent here by the
// VgwDataRouter application. It sends us raw binary data which
Expand Down
Loading

0 comments on commit 863e59e

Please sign in to comment.