Skip to content

Commit

Permalink
Increased the CPU yield time and added a new parameter to control the…
Browse files Browse the repository at this point in the history
… custom metrics live update.
  • Loading branch information
Senthil Nathan committed Sep 22, 2018
1 parent 268be31 commit 0d163b6
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 55 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
Changes
=======
## v1.0.1:
* Sep/21/2018
* In the internal threads of the WatsonSTT operator code, the CPU yield time during idleness and in between the Websocket connection attempts was increased from a few milliseconds to 1 second.
* New operator parameter sttLiveMetricsUpdateNeeded was added to give the users a way to turn the custom operator metrics reporting on and off in the time critical path of audio transcription.
* Use of the internal custom metrics update API was changed from setValue to setValueNoLock.
* Corresponding documentation refinements were also done.

## v1.0.0:
* Very first release that is tested to work well with the Watson STT cloud service.
* Sep/17/2018
* Very first release of this toolkit that was tested to support all the major features available in the IBM Watson Speech To Text (STT) cloud service.
2 changes: 1 addition & 1 deletion GRADUATION_STATUS.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Incubation for projects is covered here: https://github.com/IBMStreams/administr
- [ ] :white_check_mark: Project has samples and instructions to help users get started easily
* Status: Multiple working samples exist
- [ ] :large_orange_diamond: Sufficient testing
* Status: Some scale tests for UDP parallel width higher than 100 are still pending.
* Status: Scale tests for an UDP parallel width of 1200 were completed. If a higher number of parallel channels needs to be supported, then that testing has not yet been done. In addition, a thorough functional and scale testing of this toolkit with the IBM Watson STT on IBM Cloud Private (ICP) is still pending.
- [ ] :white_check_mark: Instructions on how to get started with development and contributing to project
* Status: Clear description exists to get started with this toolkit.
- [ ] :white_check_mark: Website that contains information and documentation of the project
Expand Down
8 changes: 0 additions & 8 deletions LICENSE.md

This file was deleted.

33 changes: 23 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
# README for the STT Gateway toolkit for IBM Streams
# STT Gateway toolkit for IBM Streams

## Purpose
This toolkit is designed to ingest audio data either stored in files (.wav, .mp3 etc. for a batch workload) or streamed through a network switch (for a real-time workload). It then transcribes that audio into text via the IBM Watson STT (Speech To Text) service running on the IBM public cloud or on the IBM Cloud Private (ICP).

## Documentation
1. The official toolkit documentation is available at this URL: https://ibmstreams.github.io/streamsx.sttgateway/
1. The official toolkit documentation with extensive details is available at this URL: https://ibmstreams.github.io/streamsx.sttgateway/

2. Another file named sttgateway-tech-brief.txt available at this tooolkit's top-level directory also provides a good amount of information about what this toolkit does, how it can be built and how it can be used in the Streams applications.
2. A file named sttgateway-tech-brief.txt available at this tooolkit's top-level directory also provides a good amount of information about what this toolkit does, how it can be built and how it can be used in the IBM Streams applications.

## Requirements
1. Network connectivity to the Watson Speech To Text (STT) service running either on the public or the private cloud is needed from the IBM Streams Linux machines where this toolkit will be used.
1. Network connectivity to the IBM Watson Speech To Text (STT) service running either on the public or the private cloud is needed from the IBM Streams Linux machines where this toolkit will be used.

2. A valid authentication token is needed to use the Watson STT service. This toolkit uses Websocket to communicate with the Watson STT cloud service. For that Websocket interface, one must use the auth tokens and not the usual cloud service credentials. So, users of this toolkit must generate their own authentication token and provide it when launching the Streams application(s) that will have a dependency on this toolkit. To generate your own auth token, please do more reading from [here](https://console.bluemix.net/docs/services/speech-to-text/input.html#tokens).

3. On the IBM Streams application development machine (where the application code is compiled to create the application bundle), it is necessary to download and install the boost_1_67_0 or a higher version as well as the websocketpp version 0.8.1. Please note that this is not needed on the Streams application execution machines. For the steps required to meet this requirement, please refer to the file named sttgateway-tech-brief.txt available at this tooolkit's top-level directory.
3. On the IBM Streams application development machine (where the application code is compiled to create the application bundle), it is necessary to download and install the boost_1_67_0 or a higher version as well as the websocketpp version 0.8.1. Please note that this is not needed on the Streams application execution machines. For the essential steps to meet this requirement, please refer to the above-mentioned documentation URL or a file named sttgateway-tech-brief.txt available at this tooolkit's top-level directory.

## Example usage of this toolkit inside a Streams application
Here is a code snippet that shows how to invoke the WatsonSTT operator available in this toolkit:
Here is a code snippet that shows how to invoke the WatsonSTT operator available in this toolkit with a subset of supported features:

```
use com.ibm.streamsx.sttgateway.watson::*;
Expand All @@ -22,7 +25,9 @@ use com.ibm.streamsx.sttgateway.watson::*;
Invoke one or more instances of the WatsonSTT operator.
You can send the audio data to this operator all at once or
you can send the audio data for the live-use case as it becomes
available from your telephony network switches.
available from your telephony network switch.
Avoid feeding audio data coming from more than one data source into this
parallel region which may cause erroneous transcription results.
NOTE: The WatsonSTT operator allows fusing multiple instances of
this operator into a single PE. This will help in reducing the
Expand Down Expand Up @@ -62,10 +67,18 @@ Following IBM Streams job sumission command shows how to override the default va
```
cd streamsx.sttgateway/samples/AudioRawWatsonSTT
make
st submitjob -d <YOUR_STREAMS_DOMAIN> -i <YOUR_STREAMS_INSTANCE> output/com.ibm.streamsx.sttgateway.sample.watsonstt.AudioRawWatsonSTT.sab -P sttAuthToken=<YOUR_WATSON_STT_SERVICE_AUTH_TOKEN> -P sttResultMode=2 -P sttBaseLanguageModel=en-US_NarrowbandModel -P contentType="audio/wav" -P filterProfanity=true -P keywordsSpottingThreshold=0.294 -P keywordsToBeSpotted="['country', 'learning', 'IBM', 'model']" -P smartFormattingNeeded=true -P identifySpeakers=true -P wordTimestampNeeded=true -P wordConfidenceNeeded=true -P wordAlternativesThreshold=0.251 -P maxUtteranceAlternatives=5 -P audioBlobFragmentSize=32768 -P audioDir=<YOUR_AUDIO_FILES_DIRECTORY> -P numberOfSTTEngines=100
st submitjob -d <YOUR_STREAMS_DOMAIN> -i <YOUR_STREAMS_INSTANCE> output/com.ibm.streamsx.sttgateway.sample.watsonstt.AudioRawWatsonSTT.sab -P sttAuthToken=<YOUR_WATSON_STT_SERVICE_AUTH_TOKEN> -P sttResultMode=2 -P sttBaseLanguageModel=en-US_NarrowbandModel -P contentType="audio/wav" -P filterProfanity=true -P keywordsSpottingThreshold=0.294 -P keywordsToBeSpotted="['country', 'learning', 'IBM', 'model']" -P smartFormattingNeeded=true -P identifySpeakers=true -P wordTimestampNeeded=true -P wordConfidenceNeeded=true -P wordAlternativesThreshold=0.251 -P maxUtteranceAlternatives=5 -P audioBlobFragmentSize=32768 -P sttLiveMetricsUpdateNeeded=true -P audioDir=<YOUR_AUDIO_FILES_DIRECTORY> -P numberOfSTTEngines=100
```

## WHATS NEW

v1.0.0
- Initial release of this toolkit with support for all the major features available in the Watson Speech To Text cloud service.
v1.0.1:
- Sep/21/2018
- In the internal threads of the WatsonSTT operator code, the CPU yield time during idleness and in between the Websocket connection attempts was increased from a few milliseconds to 1 second.
- New operator parameter sttLiveMetricsUpdateNeeded was added to give the users a way to turn the custom operator metrics reporting on and off in the time critical path of audio transcription.
- Use of the internal custom metrics update API was changed from setValue to setValueNoLock.
- Corresponding documentation refinements were also done.

v1.0.0:
- Sep/17/2018
- Very first release of this toolkit that was tested to support all the major features available in the IBM Watson Speech To Text (STT) cloud service.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
See the samples folder inside this toolkit for working examples that show how to use this operator.

For a detailed documentation about the operator design, usage patterns and in-depth technical
details, please refer to the STT Gateway toolkit documentation available at this URL:
details, please refer to the official STT Gateway toolkit documentation available at this URL:

[https://ibmstreams.github.io/streamsx.sttgateway]
</description>
Expand All @@ -45,26 +45,37 @@
<iconUri size="32">watsonstt_32.gif</iconUri>

<metrics>
<description>
A few custom metrics are available for the WatsonSTT operator. The Counter kind
metrics listed below will be updated when the operator starts.
But, the Gauge kind metrics will be updated live during
transcription only when the sttLiveMetricsUpdateNeeded
operator parameter is set to true.
</description>

<metric>
<name>nSTTResultMode</name>
<description>STT result mode currently in effect for a given operator instance.</description>
<kind>Counter</kind>
</metric>

<metric>
<name>nWebsocketConnectionAttempts</name>
<description>The number of Websocket connection attempts made by this operator instance.</description>
<description>Number of STT service Websocket connection attempts made by this operator instance.</description>
<kind>Counter</kind>
</metric>

<metric>
<name>nFullAudioConversationsReceived</name>
<description>The number of full audio conversations received by this operator instance.</description>
<description>Number of full audio conversations received for transcription by this operator instance.</description>
<kind>Gauge</kind>
</metric>

<metric>
<name>nFullAudioConversationsTranscribed</name>
<description>The number of full audio conversations transcribed by this operator instance.</description>
<description>Number of full audio conversations transcribed by this operator instance.</description>
<kind>Gauge</kind>
</metric>
<metric>
<name>nSTTResultMode</name>
<description>Current result mode for this WatsonSTT operator instance.</description>
<kind>Counter</kind>
</metric>
</metrics>

<customOutputFunctions>
Expand Down Expand Up @@ -444,6 +455,16 @@
<cardinality>1</cardinality>
</parameter>

<parameter>
<name>sttLiveMetricsUpdateNeeded</name>
<description>This parameter specifies whether live update for this operator's custom metrics is needed. (Default is true)</description>
<optional>true</optional>
<rewriteAllowed>true</rewriteAllowed>
<expressionMode>AttributeFree</expressionMode>
<type>boolean</type>
<cardinality>1</cardinality>
</parameter>

</parameters>
<inputPorts>
<inputPortSet>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,27 @@ using websocketpp::lib::bind;
my $maxAllowedConnectionAttempts = $model->getParameterByName("maxAllowedConnectionAttempts");
# Default: 10 attempts
$maxAllowedConnectionAttempts = $maxAllowedConnectionAttempts ? $maxAllowedConnectionAttempts->getValueAt(0)->getCppExpression() : 10;

my $sttLiveMetricsUpdateNeeded = $model->getParameterByName("sttLiveMetricsUpdateNeeded");
$sttLiveMetricsUpdateNeeded = $sttLiveMetricsUpdateNeeded ? $sttLiveMetricsUpdateNeeded->getValueAt(0)->getCppExpression() : 1;
%>

<%SPL::CodeGen::implementationPrologue($model);%>

// Constructor
MY_OPERATOR::MY_OPERATOR()
{
// They are already defined in the operator model XML file. Hence, there is no need to explicitly create them here.
// Custom metrics for this operator are already defined in the operator model XML file.
// Hence, there is no need to explicitly create them here.
// Simply get the custom metrics already defined for this operator.
// We will update the Counter kind custom metrics when the operator starts.
// We will update the Gauge kind custom metrics during transcription only when the
// sttLiveMetricsUpdateNeeded optional operator parameter is set to true.
OperatorMetrics & opm = getContext().getMetrics();
nSTTResultModeMetric = & opm.getCustomMetricByName("nSTTResultMode");
nWebsocketConnectionAttemptsMetric = & opm.getCustomMetricByName("nWebsocketConnectionAttempts");
nFullAudioConversationsReceivedMetric = & opm.getCustomMetricByName("nFullAudioConversationsReceived");
nFullAudioConversationsTranscribedMetric = & opm.getCustomMetricByName("nFullAudioConversationsTranscribed");
nSTTResultModeMetric = & opm.getCustomMetricByName("nSTTResultMode");

numberOfAudioBlobFragmentsReceivedInCurrentConversation = 0;
numberOfFullAudioConversationsReceived = 0;
Expand Down Expand Up @@ -298,6 +305,8 @@ MY_OPERATOR::MY_OPERATOR()
" Valid value must be greater than or equal to 1.");
}

sttLiveMetricsUpdateNeeded = <%=$sttLiveMetricsUpdateNeeded%>;

// We are not going to support the following utterance based
// features when the STT result mode is 3 (full transcript).
// Many of these features return the results in individual arrays for a
Expand All @@ -323,7 +332,7 @@ MY_OPERATOR::MY_OPERATOR()
}

// Update the operator metric.
nSTTResultModeMetric->setValue(sttResultMode);
nSTTResultModeMetric->setValueNoLock(sttResultMode);
}

// Destructor
Expand Down Expand Up @@ -549,8 +558,10 @@ void MY_OPERATOR::process(Tuple & tuple, uint32_t port)
numberOfFullAudioConversationsReceived++;
} // End of if (audioInputAsBlob == true)

// Update the operator metric.
nFullAudioConversationsReceivedMetric->setValue(numberOfFullAudioConversationsReceived);
// Update the operator metric only if the user asked for a live update.
if (sttLiveMetricsUpdateNeeded == true) {
nFullAudioConversationsReceivedMetric->setValueNoLock(numberOfFullAudioConversationsReceived);
}

// Let us do all the auto output tuple attribute assignments and store it in a
// list to be used in the on_message event handler method below at the time of
Expand Down Expand Up @@ -617,15 +628,26 @@ void MY_OPERATOR::ws_audio_blob_sender() {
while (!getPE().getShutdownRequested()) {
// Keep waiting in this while loop until
// there is some work that needs to be performed.
// Wait for a configured amount of time that is not 0.0.
if (cpuYieldTimeInAudioSenderThread > 0.0) {
// Wait for a configured amount of time that is not 0.0 when there is
// audio data actively available for processing.
// When there is no audio data available, yield the CPU for
// slightly a longer time.
int32_t audioBytesVectorSize = audioBytes.size();
int32_t audioFilesVectorSize = audioFiles.size();
if(audioBytesVectorSize <= 0 && audioFilesVectorSize <= 0) {
// There is no audio data available at this time.
// Yield the CPU for slightly a longer time.
// 1 second instead of 1 msec.
SPL::Functions::Utility::block(1.0);
} else if (cpuYieldTimeInAudioSenderThread > 0.0) {
// Audio data available for processing. Yield the CPU briefly and get to work soon.
// Even a tiny value of 1 millisecond (0.001 second) will yield the
// CPU and will not show 0% idle in the Linux top command.
SPL::Functions::Utility::block(cpuYieldTimeInAudioSenderThread);
}

// Check if the Websocket connection needs to be established.
if ((audioBytes.size() > 0 || audioFiles.size() > 0) &&
if ((audioBytesVectorSize > 0 || audioFilesVectorSize > 0) &&
wsConnectionEstablished == false) {
// When there is audio data waiting to be processed and
// if the Websocket connection is not active at that time,
Expand Down Expand Up @@ -667,7 +689,7 @@ void MY_OPERATOR::ws_audio_blob_sender() {
websocketConnectionErrorOccurred = false;
makeNewWebsocketConnection = true;
// Update the operator metric.
nWebsocketConnectionAttemptsMetric->setValue(numberOfWebsocketConnectionAttempts);
nWebsocketConnectionAttemptsMetric->setValueNoLock(numberOfWebsocketConnectionAttempts);

// After a successful connection, makeNewWebsocketConnection will be
// set to false in the ws_init method below.
Expand Down Expand Up @@ -707,15 +729,15 @@ void MY_OPERATOR::ws_audio_blob_sender() {
SPLAPPTRC(L_DEBUG, "Operator " << operatorPhysicalName <<
"-->Channel " << boost::to_string(udpChannelNumber) <<
"-->Reached 9", "reestablish_ws_connection");
SPL::Functions::Utility::block(0.250);
SPL::Functions::Utility::block(0.500);
SPLAPPTRC(L_DEBUG, "Operator " << operatorPhysicalName <<
"-->Channel " << boost::to_string(udpChannelNumber) <<
"-->Reached 10", "reestablish_ws_connection");
} // End of the while loop.

// Continue from the top of the outer loop.
continue;
} // End of the if block
} // End of the if segment

// It is a special delay to give enough time for the
// Websocket thread's on_close event handler method below to
Expand All @@ -731,7 +753,7 @@ void MY_OPERATOR::ws_audio_blob_sender() {
}

// Check if there is audio data waiting to be sent to the STT service.
if ((audioBytes.size() > 0 || audioFiles.size() > 0) &&
if ((audioBytesVectorSize > 0 || audioFilesVectorSize > 0) &&
wsConnectionEstablished == true &&
statusOfAudioDataTransmissionToSTT != FULL_AUDIO_DATA_SENT_TO_STT) {
// If an error happened in the STT processing (invalid audio data or
Expand Down Expand Up @@ -875,7 +897,7 @@ void MY_OPERATOR::ws_audio_blob_sender() {
// Continue from the top of the while loop.
continue;
} // End of if (audioInputAsBlob == false)
} // End of if ((audioBytes.size() > 0 ||
} // End of if ((audioBytesVectorSize > 0 ||
} // End of the while loop
}

Expand All @@ -890,8 +912,8 @@ void MY_OPERATOR::ws_init() {
if (makeNewWebsocketConnection == false) {
// Keep waiting in this while loop until
// a need arises to make a new Websocket connection.
// 250 milliseconds wait.
SPL::Functions::Utility::block(0.250);
// 1 second wait.
SPL::Functions::Utility::block(1.0);
continue;
}

Expand Down Expand Up @@ -2436,9 +2458,11 @@ void MY_OPERATOR::on_message(MY_OPERATOR::client* c, websocketpp::connection_hdl
}

numberOfFullAudioConversationsTranscribed++;
// Update the operator metric.
nFullAudioConversationsTranscribedMetric->setValue(numberOfFullAudioConversationsTranscribed);

// Update the operator metric only if the user asked for a live update.
if (sttLiveMetricsUpdateNeeded == true) {
nFullAudioConversationsTranscribedMetric->setValueNoLock(numberOfFullAudioConversationsTranscribed);
}

if (sttJsonResponseDebugging == true) {
std::string tempString = "transcription completion.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public:
SPL::float64 cpuYieldTimeInAudioSenderThread;
SPL::float64 waitTimeBeforeSTTServiceConnectionRetry;
SPL::int32 maxAllowedConnectionAttempts;
bool sttLiveMetricsUpdateNeeded;
std::string uri;
std::string authToken;
std::string baseLanguageModel;
Expand Down
Loading

0 comments on commit 0d163b6

Please sign in to comment.