From c6f5f11377470e252a4cd9d52b3e0635610bfeca Mon Sep 17 00:00:00 2001 From: Jerjou Cheng Date: Fri, 6 Oct 2017 13:42:00 -0700 Subject: [PATCH 1/8] Draft 1: indefinitely-long streaming transcription --- .../transcribe_streaming_indefinite.py | 250 ++++++++++++++++++ 1 file changed, 250 insertions(+) create mode 100644 speech/cloud-client/transcribe_streaming_indefinite.py diff --git a/speech/cloud-client/transcribe_streaming_indefinite.py b/speech/cloud-client/transcribe_streaming_indefinite.py new file mode 100644 index 000000000000..cae1b624ca07 --- /dev/null +++ b/speech/cloud-client/transcribe_streaming_indefinite.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python + +# Copyright 2017 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Google Cloud Speech API sample application using the streaming API. + +NOTE: This module requires the additional dependency `pyaudio`. To install +using pip: + + pip install pyaudio + +Example usage: + python transcribe_streaming_mic.py +""" + +# [START import_libraries] +from __future__ import division + +import collections +import itertools +import re +import sys + +from google.cloud import speech +from google.cloud.speech import enums +from google.cloud.speech import types +from google import gax +import grpc +import pyaudio +from six.moves import queue +# [END import_libraries] + +# Audio recording parameters +RATE = 16000 +CHUNK = int(RATE / 10) # 100ms + + +class MicrophoneStream(object): + """Opens a recording stream as a generator yielding the audio chunks.""" + def __init__(self, rate, chunk_size, max_replay_secs=5): + self._rate = rate + self._chunk_size = chunk_size + self._max_replay_secs = max_replay_secs + + # Create a thread-safe buffer of audio data + self._buff = queue.Queue() + self.closed = True + + def __enter__(self): + num_channels = 1 + self._audio_interface = pyaudio.PyAudio() + self._audio_stream = self._audio_interface.open( + format=pyaudio.paInt16, + # The API currently only supports 1-channel (mono) audio + # https://goo.gl/z757pE + channels=num_channels, rate=self._rate, + input=True, frames_per_buffer=self._chunk_size, + # Run the audio stream asynchronously to fill the buffer object. + # This is necessary so that the input device's buffer doesn't + # overflow while the calling thread makes network requests, etc. + stream_callback=self._fill_buffer, + ) + + self.closed = False + + bytes_per_sample = 2 * num_channels # 2 bytes in 16 bit samples + self._bytes_per_second = self._rate * bytes_per_sample + + bytes_per_chunk = (self._chunk_size * bytes_per_sample) + chunks_per_second = self._bytes_per_second / bytes_per_chunk + self._untranscribed = collections.deque( + maxlen=self._max_replay_secs * chunks_per_second) + + return self + + def __exit__(self, type, value, traceback): + self._audio_stream.stop_stream() + self._audio_stream.close() + self.closed = True + # Signal the generator to terminate so that the client's + # streaming_recognize method will not block the process termination. + self._buff.put(None) + self._audio_interface.terminate() + + def _fill_buffer(self, in_data, frame_count, time_info, status_flags): + """Continuously collect data from the audio stream, into the buffer.""" + self._buff.put(in_data) + return None, pyaudio.paContinue + + def on_transcribe(self, end_time): + while self._untranscribed and end_time > self._untranscribed[0][1]: + self._untranscribed.popleft() + + def generator(self, resume=False): + total_bytes_sent = 0 + if resume: + # Yield all the untranscribed chunks first + for chunk, _ in self._untranscribed: + yield chunk + while not self.closed: + # Use a blocking get() to ensure there's at least one chunk of + # data, and stop iteration if the chunk is None, indicating the + # end of the audio stream. + chunk = self._buff.get() + if chunk is None: + return + data = [chunk] + + # Now consume whatever other data's still buffered. + while True: + try: + chunk = self._buff.get(block=False) + if chunk is None: + return + data.append(chunk) + except queue.Empty: + break + + byte_data = b''.join(data) + + # Populate the replay buffer of untranscribed audio bytes + total_bytes_sent += len(byte_data) + chunk_end_time = total_bytes_sent / self._bytes_per_second + self._untranscribed.append((byte_data, chunk_end_time)) + + yield byte_data +# [END audio_stream] + + +def duration_to_secs(duration): + return duration.seconds + (duration.nanos / float(1e9)) + + +def listen_print_loop(responses, stream): + """Iterates through server responses and prints them. + + The responses passed is a generator that will block until a response + is provided by the server. + + Each response may contain multiple results, and each result may contain + multiple alternatives; for details, see https://goo.gl/tjCPAU. Here we + print only the transcription for the top alternative of the top result. + + In this case, responses are provided for interim results as well. If the + response is an interim one, print a line feed at the end of it, to allow + the next result to overwrite it, until the response is a final one. For the + final one, print a newline to preserve the finalized transcription. + """ + num_chars_printed = 0 + for response in responses: + if not response.results: + continue + + # The `results` list is consecutive. For streaming, we only care about + # the first result being considered, since once it's `is_final`, it + # moves on to considering the next utterance. + result = response.results[0] + if not result.alternatives: + continue + + top_alternative = result.alternatives[0] + # Display the transcription of the top alternative. + transcript = top_alternative.transcript + + # Display interim results, but with a carriage return at the end of the + # line, so subsequent lines will overwrite them. + # + # If the previous result was longer than this one, we need to print + # some extra spaces to overwrite the previous result + overwrite_chars = ' ' * (num_chars_printed - len(transcript)) + + if not result.is_final: + sys.stdout.write(transcript + overwrite_chars + '\r') + sys.stdout.flush() + + num_chars_printed = len(transcript) + else: + print(transcript + overwrite_chars) + + # Exit recognition if any of the transcribed phrases could be + # one of our keywords. + if re.search(r'\b(exit|quit)\b', transcript, re.I): + print('Exiting..') + break + + num_chars_printed = 0 + + # Keep track of what transcripts we've received, so we can resume + # intelligently when we hit the deadline + stream.on_transcribe(duration_to_secs( + top_alternative.words[-1].end_time)) + + +def main(): + # See http://g.co/cloud/speech/docs/languages + # for a list of supported languages. + language_code = 'en-US' # a BCP-47 language tag + + client = speech.SpeechClient() + config = types.RecognitionConfig( + encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16, + sample_rate_hertz=RATE, + language_code=language_code, + max_alternatives=1, + enable_word_time_offsets=True) + streaming_config = types.StreamingRecognitionConfig( + config=config, + interim_results=True) + + with MicrophoneStream(RATE, CHUNK) as stream: + resume = False + while True: + audio_generator = stream.generator(resume=resume) + requests = (types.StreamingRecognizeRequest(audio_content=content) + for content in audio_generator) + + responses = client.streaming_recognize( + streaming_config, requests, + options=gax.CallOptions(timeout=(60 * 4))) + + try: + # Now, put the transcription responses to use. + listen_print_loop(responses, stream) + break + except grpc.RpcError, e: # TODO: wrong exception + if e.code() != grpc.StatusCode.INVALID_ARGUMENT: + raise + + details = e.details() + if 'deadline too short' not in details: + raise + + print('Resuming..') + resume = True + + +if __name__ == '__main__': + main() From 1c71119d2a3d017d95e3adc55ddc48e0b4551619 Mon Sep 17 00:00:00 2001 From: Jerjou Cheng Date: Fri, 6 Oct 2017 15:36:54 -0700 Subject: [PATCH 2/8] Clean up & refactor of indefinite speech transcrib --- .../transcribe_streaming_indefinite.py | 252 +++++++++--------- .../cloud-client/transcribe_streaming_mic.py | 45 ++-- 2 files changed, 143 insertions(+), 154 deletions(-) diff --git a/speech/cloud-client/transcribe_streaming_indefinite.py b/speech/cloud-client/transcribe_streaming_indefinite.py index cae1b624ca07..8286b6ba4ede 100644 --- a/speech/cloud-client/transcribe_streaming_indefinite.py +++ b/speech/cloud-client/transcribe_streaming_indefinite.py @@ -28,10 +28,13 @@ # [START import_libraries] from __future__ import division +import argparse import collections import itertools import re import sys +import threading +import time from google.cloud import speech from google.cloud.speech import enums @@ -40,64 +43,32 @@ import grpc import pyaudio from six.moves import queue +import six + +import transcribe_streaming_mic # [END import_libraries] -# Audio recording parameters -RATE = 16000 -CHUNK = int(RATE / 10) # 100ms + +def duration_to_secs(duration): + return duration.seconds + (duration.nanos / float(1e9)) -class MicrophoneStream(object): +class ResumableMicrophoneStream(transcribe_streaming_mic.MicrophoneStream): """Opens a recording stream as a generator yielding the audio chunks.""" def __init__(self, rate, chunk_size, max_replay_secs=5): - self._rate = rate - self._chunk_size = chunk_size + super(ResumableMicrophoneStream, self).__init__(rate, chunk_size) self._max_replay_secs = max_replay_secs - # Create a thread-safe buffer of audio data - self._buff = queue.Queue() - self.closed = True - - def __enter__(self): - num_channels = 1 - self._audio_interface = pyaudio.PyAudio() - self._audio_stream = self._audio_interface.open( - format=pyaudio.paInt16, - # The API currently only supports 1-channel (mono) audio - # https://goo.gl/z757pE - channels=num_channels, rate=self._rate, - input=True, frames_per_buffer=self._chunk_size, - # Run the audio stream asynchronously to fill the buffer object. - # This is necessary so that the input device's buffer doesn't - # overflow while the calling thread makes network requests, etc. - stream_callback=self._fill_buffer, - ) - - self.closed = False + # Some useful numbers + # 2 bytes in 16 bit samples + self._bytes_per_sample = 2 * self._num_channels + self._bytes_per_second = self._rate * self._bytes_per_sample - bytes_per_sample = 2 * num_channels # 2 bytes in 16 bit samples - self._bytes_per_second = self._rate * bytes_per_sample - - bytes_per_chunk = (self._chunk_size * bytes_per_sample) - chunks_per_second = self._bytes_per_second / bytes_per_chunk + self._bytes_per_chunk = (self._chunk_size * self._bytes_per_sample) + self._chunks_per_second = ( + self._bytes_per_second / self._bytes_per_chunk) self._untranscribed = collections.deque( - maxlen=self._max_replay_secs * chunks_per_second) - - return self - - def __exit__(self, type, value, traceback): - self._audio_stream.stop_stream() - self._audio_stream.close() - self.closed = True - # Signal the generator to terminate so that the client's - # streaming_recognize method will not block the process termination. - self._buff.put(None) - self._audio_interface.terminate() - - def _fill_buffer(self, in_data, frame_count, time_info, status_flags): - """Continuously collect data from the audio stream, into the buffer.""" - self._buff.put(in_data) - return None, pyaudio.paContinue + maxlen=self._max_replay_secs * self._chunks_per_second) def on_transcribe(self, end_time): while self._untranscribed and end_time > self._untranscribed[0][1]: @@ -106,104 +77,104 @@ def on_transcribe(self, end_time): def generator(self, resume=False): total_bytes_sent = 0 if resume: + # Make a copy, in case on_transcribe is called while yielding them + catchup = list(self._untranscribed) # Yield all the untranscribed chunks first - for chunk, _ in self._untranscribed: + for chunk, _ in catchup: yield chunk - while not self.closed: - # Use a blocking get() to ensure there's at least one chunk of - # data, and stop iteration if the chunk is None, indicating the - # end of the audio stream. - chunk = self._buff.get() - if chunk is None: - return - data = [chunk] - - # Now consume whatever other data's still buffered. - while True: - try: - chunk = self._buff.get(block=False) - if chunk is None: - return - data.append(chunk) - except queue.Empty: - break - - byte_data = b''.join(data) + for byte_data in super(ResumableMicrophoneStream, self).generator(): # Populate the replay buffer of untranscribed audio bytes total_bytes_sent += len(byte_data) chunk_end_time = total_bytes_sent / self._bytes_per_second self._untranscribed.append((byte_data, chunk_end_time)) yield byte_data -# [END audio_stream] -def duration_to_secs(duration): - return duration.seconds + (duration.nanos / float(1e9)) +class SimulatedMicrophoneStream(ResumableMicrophoneStream): + def __init__(self, audio_src, *args, **kwargs): + super(SimulatedMicrophoneStream, self).__init__(*args, **kwargs) + self._audio_src = audio_src + def _delayed(self, get_data): + total_bytes_read = 0 + start_time = time.time() -def listen_print_loop(responses, stream): - """Iterates through server responses and prints them. + chunk = get_data(self._bytes_per_chunk) - The responses passed is a generator that will block until a response - is provided by the server. + while chunk and not self.closed: + total_bytes_read += len(chunk) + expected_yield_time = start_time + ( + total_bytes_read / self._bytes_per_second) + now = time.time() + if expected_yield_time > now: + time.sleep(expected_yield_time - now) - Each response may contain multiple results, and each result may contain - multiple alternatives; for details, see https://goo.gl/tjCPAU. Here we - print only the transcription for the top alternative of the top result. + yield chunk - In this case, responses are provided for interim results as well. If the - response is an interim one, print a line feed at the end of it, to allow - the next result to overwrite it, until the response is a final one. For the - final one, print a newline to preserve the finalized transcription. - """ - num_chars_printed = 0 - for response in responses: - if not response.results: - continue - - # The `results` list is consecutive. For streaming, we only care about - # the first result being considered, since once it's `is_final`, it - # moves on to considering the next utterance. - result = response.results[0] - if not result.alternatives: - continue - - top_alternative = result.alternatives[0] - # Display the transcription of the top alternative. - transcript = top_alternative.transcript - - # Display interim results, but with a carriage return at the end of the - # line, so subsequent lines will overwrite them. - # - # If the previous result was longer than this one, we need to print - # some extra spaces to overwrite the previous result - overwrite_chars = ' ' * (num_chars_printed - len(transcript)) - - if not result.is_final: - sys.stdout.write(transcript + overwrite_chars + '\r') - sys.stdout.flush() - - num_chars_printed = len(transcript) - else: - print(transcript + overwrite_chars) - - # Exit recognition if any of the transcribed phrases could be - # one of our keywords. - if re.search(r'\b(exit|quit)\b', transcript, re.I): - print('Exiting..') - break + chunk = get_data(self._bytes_per_chunk) + + def _stream_from_file(self, audio_src): + with open(audio_src, 'rb') as f: + for chunk in self._delayed( + lambda b_per_chunk: f.read(b_per_chunk)): + yield chunk + + # Continue sending silence - 10s worth + trailing_silence = six.StringIO( + b'\0' * self._bytes_per_second * 10) + for chunk in self._delayed(trailing_silence.read): + yield chunk + + def _thread(self): + for chunk in self._stream_from_file(self._audio_src): + self._fill_buffer(chunk) + self._fill_buffer(None) + + def __enter__(self): + self.closed = False + + threading.Thread(target=self._thread).start() + + return self + + def __exit__(self, type, value, traceback): + self.closed = True - num_chars_printed = 0 +def _record_keeper(responses, stream): + """Calls the stream's on_transcribe callback for each final response. + + Args: + responses - a generator of responses. The responses must already be + filtered for ones with results and alternatives. + stream - a ResumableMicrophoneStream. + """ + for r in responses: + result = r.results[0] + if result.is_final: + top_alternative = result.alternatives[0] # Keep track of what transcripts we've received, so we can resume # intelligently when we hit the deadline stream.on_transcribe(duration_to_secs( top_alternative.words[-1].end_time)) + yield r + +def listen_print_loop(responses, stream): + """Iterates through server responses and prints them. -def main(): + Same as in transcribe_streaming_mic, but keeps track of when a sent + audio_chunk has been transcribed. + """ + with_results = (r for r in responses if ( + r.results and r.results[0].alternatives)) + transcribe_streaming_mic.listen_print_loop( + _record_keeper(with_results, stream)) + + +def main(sample_rate, audio_src): # See http://g.co/cloud/speech/docs/languages # for a list of supported languages. language_code = 'en-US' # a BCP-47 language tag @@ -211,7 +182,7 @@ def main(): client = speech.SpeechClient() config = types.RecognitionConfig( encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16, - sample_rate_hertz=RATE, + sample_rate_hertz=sample_rate, language_code=language_code, max_alternatives=1, enable_word_time_offsets=True) @@ -219,32 +190,47 @@ def main(): config=config, interim_results=True) - with MicrophoneStream(RATE, CHUNK) as stream: + if audio_src: + mic_manager = SimulatedMicrophoneStream( + audio_src, sample_rate, int(sample_rate / 10)) + else: + mic_manager = ResumableMicrophoneStream( + sample_rate, int(sample_rate / 10)) + + with mic_manager as stream: resume = False while True: audio_generator = stream.generator(resume=resume) requests = (types.StreamingRecognizeRequest(audio_content=content) for content in audio_generator) - responses = client.streaming_recognize( - streaming_config, requests, - options=gax.CallOptions(timeout=(60 * 4))) + responses = client.streaming_recognize(streaming_config, requests) try: # Now, put the transcription responses to use. listen_print_loop(responses, stream) break - except grpc.RpcError, e: # TODO: wrong exception - if e.code() != grpc.StatusCode.INVALID_ARGUMENT: + except grpc.RpcError, e: + if e.code() not in (grpc.StatusCode.INVALID_ARGUMENT, + grpc.StatusCode.OUT_OF_RANGE): raise - details = e.details() - if 'deadline too short' not in details: - raise + if e.code() == grpc.StatusCode.INVALID_ARGUMENT: + if 'deadline too short' not in details: + raise + else: + if 'maximum allowed stream duration' not in details: + raise print('Resuming..') resume = True if __name__ == '__main__': - main() + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument('--rate', default=16000, help='Sample rate.', type=int) + parser.add_argument('--audio_src', help='File to simulate streaming of.') + args = parser.parse_args() + main(args.rate, args.audio_src) diff --git a/speech/cloud-client/transcribe_streaming_mic.py b/speech/cloud-client/transcribe_streaming_mic.py index eb6f6ded3dd8..0ce2b20a1674 100644 --- a/speech/cloud-client/transcribe_streaming_mic.py +++ b/speech/cloud-client/transcribe_streaming_mic.py @@ -25,9 +25,9 @@ python transcribe_streaming_mic.py """ -# [START import_libraries] from __future__ import division +import argparse import re import sys @@ -36,39 +36,36 @@ from google.cloud.speech import types import pyaudio from six.moves import queue -# [END import_libraries] - -# Audio recording parameters -RATE = 16000 -CHUNK = int(RATE / 10) # 100ms - class MicrophoneStream(object): """Opens a recording stream as a generator yielding the audio chunks.""" - def __init__(self, rate, chunk): + def __init__(self, rate, chunk_size): self._rate = rate - self._chunk = chunk + self._chunk_size = chunk_size # Create a thread-safe buffer of audio data self._buff = queue.Queue() self.closed = True + # Some useful numbers + self._num_channels = 1 # API only supports mono for now + def __enter__(self): + self.closed = False + self._audio_interface = pyaudio.PyAudio() self._audio_stream = self._audio_interface.open( format=pyaudio.paInt16, # The API currently only supports 1-channel (mono) audio # https://goo.gl/z757pE - channels=1, rate=self._rate, - input=True, frames_per_buffer=self._chunk, + channels=self._num_channels, rate=self._rate, + input=True, frames_per_buffer=self._chunk_size, # Run the audio stream asynchronously to fill the buffer object. # This is necessary so that the input device's buffer doesn't # overflow while the calling thread makes network requests, etc. stream_callback=self._fill_buffer, ) - self.closed = False - return self def __exit__(self, type, value, traceback): @@ -80,7 +77,7 @@ def __exit__(self, type, value, traceback): self._buff.put(None) self._audio_interface.terminate() - def _fill_buffer(self, in_data, frame_count, time_info, status_flags): + def _fill_buffer(self, in_data, *args, **kwargs): """Continuously collect data from the audio stream, into the buffer.""" self._buff.put(in_data) return None, pyaudio.paContinue @@ -106,7 +103,6 @@ def generator(self): break yield b''.join(data) -# [END audio_stream] def listen_print_loop(responses): @@ -137,7 +133,8 @@ def listen_print_loop(responses): continue # Display the transcription of the top alternative. - transcript = result.alternatives[0].transcript + top_alternative = result.alternatives[0] + transcript = top_alternative.transcript # Display interim results, but with a carriage return at the end of the # line, so subsequent lines will overwrite them. @@ -164,7 +161,7 @@ def listen_print_loop(responses): num_chars_printed = 0 -def main(): +def main(sample_rate): # See http://g.co/cloud/speech/docs/languages # for a list of supported languages. language_code = 'en-US' # a BCP-47 language tag @@ -172,13 +169,14 @@ def main(): client = speech.SpeechClient() config = types.RecognitionConfig( encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16, - sample_rate_hertz=RATE, - language_code=language_code) + sample_rate_hertz=sample_rate, + language_code=language_code, + max_alternatives=1) streaming_config = types.StreamingRecognitionConfig( config=config, interim_results=True) - with MicrophoneStream(RATE, CHUNK) as stream: + with MicrophoneStream(sample_rate, int(sample_rate / 10)) as stream: audio_generator = stream.generator() requests = (types.StreamingRecognizeRequest(audio_content=content) for content in audio_generator) @@ -190,4 +188,9 @@ def main(): if __name__ == '__main__': - main() + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument('--rate', default=16000, help='Sample rate.', type=int) + args = parser.parse_args() + main(args.rate) From 0ba088a7b8497a79b0d32200530a65bf8514ac38 Mon Sep 17 00:00:00 2001 From: Jerjou Cheng Date: Thu, 31 May 2018 10:27:59 -0700 Subject: [PATCH 3/8] Make sure chunks_per_second is a whole number. --- speech/cloud-client/transcribe_streaming_indefinite.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/speech/cloud-client/transcribe_streaming_indefinite.py b/speech/cloud-client/transcribe_streaming_indefinite.py index 8286b6ba4ede..ab7b7a57a1d3 100644 --- a/speech/cloud-client/transcribe_streaming_indefinite.py +++ b/speech/cloud-client/transcribe_streaming_indefinite.py @@ -39,7 +39,6 @@ from google.cloud import speech from google.cloud.speech import enums from google.cloud.speech import types -from google import gax import grpc import pyaudio from six.moves import queue @@ -66,7 +65,7 @@ def __init__(self, rate, chunk_size, max_replay_secs=5): self._bytes_per_chunk = (self._chunk_size * self._bytes_per_sample) self._chunks_per_second = ( - self._bytes_per_second / self._bytes_per_chunk) + self._bytes_per_second // self._bytes_per_chunk) self._untranscribed = collections.deque( maxlen=self._max_replay_secs * self._chunks_per_second) From b0bd8e9ce6b1535c81715b35fed05cef2aa37cba Mon Sep 17 00:00:00 2001 From: Jerjou Cheng Date: Thu, 31 May 2018 10:42:53 -0700 Subject: [PATCH 4/8] Update for google-cloud-python client lib. --- speech/cloud-client/requirements.txt | 1 + .../transcribe_streaming_indefinite.py | 21 ++++--------------- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/speech/cloud-client/requirements.txt b/speech/cloud-client/requirements.txt index 87b74e0d7334..fb4f74c65cda 100644 --- a/speech/cloud-client/requirements.txt +++ b/speech/cloud-client/requirements.txt @@ -1 +1,2 @@ google-cloud-speech==0.33.0 +google-api-core==1.2.0 diff --git a/speech/cloud-client/transcribe_streaming_indefinite.py b/speech/cloud-client/transcribe_streaming_indefinite.py index ab7b7a57a1d3..d1761d50c038 100644 --- a/speech/cloud-client/transcribe_streaming_indefinite.py +++ b/speech/cloud-client/transcribe_streaming_indefinite.py @@ -30,18 +30,13 @@ import argparse import collections -import itertools -import re -import sys import threading import time from google.cloud import speech from google.cloud.speech import enums from google.cloud.speech import types -import grpc -import pyaudio -from six.moves import queue +from google.api_core import exceptions import six import transcribe_streaming_mic @@ -209,18 +204,10 @@ def main(sample_rate, audio_src): # Now, put the transcription responses to use. listen_print_loop(responses, stream) break - except grpc.RpcError, e: - if e.code() not in (grpc.StatusCode.INVALID_ARGUMENT, - grpc.StatusCode.OUT_OF_RANGE): + except (exceptions.OutOfRange, exceptions.InvalidArgument) as e: + if not ('maximum allowed stream duration' in e.message or + 'deadline too short' in e.message): raise - details = e.details() - if e.code() == grpc.StatusCode.INVALID_ARGUMENT: - if 'deadline too short' not in details: - raise - else: - if 'maximum allowed stream duration' not in details: - raise - print('Resuming..') resume = True From b25330a4dac80be72efb1705aed9e1f305c0b9f7 Mon Sep 17 00:00:00 2001 From: nnegrey Date: Mon, 17 Dec 2018 16:06:01 -0800 Subject: [PATCH 5/8] Update sample to not error out, but make a new request every ~60ish seconds --- speech/cloud-client/requirements.txt | 5 +- .../transcribe_streaming_indefinite.py | 284 ++++++++++-------- 2 files changed, 168 insertions(+), 121 deletions(-) diff --git a/speech/cloud-client/requirements.txt b/speech/cloud-client/requirements.txt index fb4f74c65cda..dacd90aef860 100644 --- a/speech/cloud-client/requirements.txt +++ b/speech/cloud-client/requirements.txt @@ -1,2 +1,3 @@ -google-cloud-speech==0.33.0 -google-api-core==1.2.0 +google-cloud-speech==0.36.0 +pyaudio==0.2.11 +six==1.12.0 \ No newline at end of file diff --git a/speech/cloud-client/transcribe_streaming_indefinite.py b/speech/cloud-client/transcribe_streaming_indefinite.py index d1761d50c038..04630638d023 100644 --- a/speech/cloud-client/transcribe_streaming_indefinite.py +++ b/speech/cloud-client/transcribe_streaming_indefinite.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2017 Google Inc. All Rights Reserved. +# Copyright 2018 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -22,38 +22,49 @@ pip install pyaudio Example usage: - python transcribe_streaming_mic.py + python transcribe_streaming_indefinite.py """ -# [START import_libraries] +# [START speech_transcribe_infinite_streaming] from __future__ import division -import argparse import collections -import threading import time +import re +import sys from google.cloud import speech -from google.cloud.speech import enums -from google.cloud.speech import types -from google.api_core import exceptions -import six -import transcribe_streaming_mic -# [END import_libraries] +import pyaudio +from six.moves import queue + +# Audio recording parameters +STREAMING_LIMIT = 55000 +SAMPLE_RATE = 16000 +CHUNK_SIZE = int(SAMPLE_RATE / 10) # 100ms + + +def get_current_time(): + return int(round(time.time() * 1000)) def duration_to_secs(duration): return duration.seconds + (duration.nanos / float(1e9)) -class ResumableMicrophoneStream(transcribe_streaming_mic.MicrophoneStream): +class ResumableMicrophoneStream: """Opens a recording stream as a generator yielding the audio chunks.""" - def __init__(self, rate, chunk_size, max_replay_secs=5): - super(ResumableMicrophoneStream, self).__init__(rate, chunk_size) - self._max_replay_secs = max_replay_secs + def __init__(self): + self._rate = SAMPLE_RATE + self._chunk_size = CHUNK_SIZE + self._num_channels = 1 + self._max_replay_secs = 5 + + # Create a thread-safe buffer of audio data + self._buff = queue.Queue() + self.closed = True + self.start_time = get_current_time() - # Some useful numbers # 2 bytes in 16 bit samples self._bytes_per_sample = 2 * self._num_channels self._bytes_per_second = self._rate * self._bytes_per_sample @@ -64,20 +75,77 @@ def __init__(self, rate, chunk_size, max_replay_secs=5): self._untranscribed = collections.deque( maxlen=self._max_replay_secs * self._chunks_per_second) + def __enter__(self): + self.closed = False + + self._audio_interface = pyaudio.PyAudio() + self._audio_stream = self._audio_interface.open( + format=pyaudio.paInt16, + channels=self._num_channels, + rate=self._rate, + input=True, + frames_per_buffer=self._chunk_size, + # Run the audio stream asynchronously to fill the buffer object. + # This is necessary so that the input device's buffer doesn't + # overflow while the calling thread makes network requests, etc. + stream_callback=self._fill_buffer, + ) + + return self + + def __exit__(self, type, value, traceback): + self._audio_stream.stop_stream() + self._audio_stream.close() + self.closed = True + # Signal the generator to terminate so that the client's + # streaming_recognize method will not block the process termination. + self._buff.put(None) + self._audio_interface.terminate() + + def _fill_buffer(self, in_data, *args, **kwargs): + """Continuously collect data from the audio stream, into the buffer.""" + self._buff.put(in_data) + return None, pyaudio.paContinue + def on_transcribe(self, end_time): while self._untranscribed and end_time > self._untranscribed[0][1]: self._untranscribed.popleft() - def generator(self, resume=False): + def data_generator(self): + while not self.closed: + if get_current_time() - self.start_time > STREAMING_LIMIT: + self.start_time = get_current_time() + break + # Use a blocking get() to ensure there's at least one chunk of + # data, and stop iteration if the chunk is None, indicating the + # end of the audio stream. + chunk = self._buff.get() + if chunk is None: + return + data = [chunk] + + # Now consume whatever other data's still buffered. + while True: + try: + chunk = self._buff.get(block=False) + if chunk is None: + return + data.append(chunk) + except queue.Empty: + break + + yield b''.join(data) + + def generator(self): total_bytes_sent = 0 - if resume: - # Make a copy, in case on_transcribe is called while yielding them - catchup = list(self._untranscribed) - # Yield all the untranscribed chunks first - for chunk, _ in catchup: - yield chunk - - for byte_data in super(ResumableMicrophoneStream, self).generator(): + + # Make a copy, in case on_transcribe is called while yielding them + catchup = list(self._untranscribed) + # Yield all the untranscribed chunks first + for chunk, _ in catchup: + yield chunk + + for byte_data in self.data_generator(): # Populate the replay buffer of untranscribed audio bytes total_bytes_sent += len(byte_data) chunk_end_time = total_bytes_sent / self._bytes_per_second @@ -86,57 +154,6 @@ def generator(self, resume=False): yield byte_data -class SimulatedMicrophoneStream(ResumableMicrophoneStream): - def __init__(self, audio_src, *args, **kwargs): - super(SimulatedMicrophoneStream, self).__init__(*args, **kwargs) - self._audio_src = audio_src - - def _delayed(self, get_data): - total_bytes_read = 0 - start_time = time.time() - - chunk = get_data(self._bytes_per_chunk) - - while chunk and not self.closed: - total_bytes_read += len(chunk) - expected_yield_time = start_time + ( - total_bytes_read / self._bytes_per_second) - now = time.time() - if expected_yield_time > now: - time.sleep(expected_yield_time - now) - - yield chunk - - chunk = get_data(self._bytes_per_chunk) - - def _stream_from_file(self, audio_src): - with open(audio_src, 'rb') as f: - for chunk in self._delayed( - lambda b_per_chunk: f.read(b_per_chunk)): - yield chunk - - # Continue sending silence - 10s worth - trailing_silence = six.StringIO( - b'\0' * self._bytes_per_second * 10) - for chunk in self._delayed(trailing_silence.read): - yield chunk - - def _thread(self): - for chunk in self._stream_from_file(self._audio_src): - self._fill_buffer(chunk) - self._fill_buffer(None) - - def __enter__(self): - self.closed = False - - threading.Thread(target=self._thread).start() - - return self - - def __exit__(self, type, value, traceback): - self.closed = True - - def _record_keeper(responses, stream): """Calls the stream's on_transcribe callback for each final response. @@ -159,64 +176,93 @@ def _record_keeper(responses, stream): def listen_print_loop(responses, stream): """Iterates through server responses and prints them. - Same as in transcribe_streaming_mic, but keeps track of when a sent - audio_chunk has been transcribed. + The responses passed is a generator that will block until a response + is provided by the server. + + Each response may contain multiple results, and each result may contain + multiple alternatives; for details, see https://goo.gl/tjCPAU. Here we + print only the transcription for the top alternative of the top result. + + In this case, responses are provided for interim results as well. If the + response is an interim one, print a line feed at the end of it, to allow + the next result to overwrite it, until the response is a final one. For the + final one, print a newline to preserve the finalized transcription. """ with_results = (r for r in responses if ( r.results and r.results[0].alternatives)) - transcribe_streaming_mic.listen_print_loop( - _record_keeper(with_results, stream)) + responses = _record_keeper(with_results, stream) + + num_chars_printed = 0 + for response in responses: + if not response.results: + continue + + # The `results` list is consecutive. For streaming, we only care about + # the first result being considered, since once it's `is_final`, it + # moves on to considering the next utterance. + result = response.results[0] + if not result.alternatives: + continue + + # Display the transcription of the top alternative. + top_alternative = result.alternatives[0] + transcript = top_alternative.transcript + + # Display interim results, but with a carriage return at the end of the + # line, so subsequent lines will overwrite them. + # + # If the previous result was longer than this one, we need to print + # some extra spaces to overwrite the previous result + overwrite_chars = ' ' * (num_chars_printed - len(transcript)) + + if not result.is_final: + sys.stdout.write(transcript + overwrite_chars + '\r') + sys.stdout.flush() + + num_chars_printed = len(transcript) + else: + print(transcript + overwrite_chars) + + # Exit recognition if any of the transcribed phrases could be + # one of our keywords. + if re.search(r'\b(exit|quit)\b', transcript, re.I): + print('Exiting..') + stream.closed = True + break + + num_chars_printed = 0 -def main(sample_rate, audio_src): - # See http://g.co/cloud/speech/docs/languages - # for a list of supported languages. - language_code = 'en-US' # a BCP-47 language tag +def main(): client = speech.SpeechClient() - config = types.RecognitionConfig( - encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16, - sample_rate_hertz=sample_rate, - language_code=language_code, + config = speech.types.RecognitionConfig( + encoding=speech.enums.RecognitionConfig.AudioEncoding.LINEAR16, + sample_rate_hertz=SAMPLE_RATE, + language_code='en-US', max_alternatives=1, enable_word_time_offsets=True) - streaming_config = types.StreamingRecognitionConfig( + streaming_config = speech.types.StreamingRecognitionConfig( config=config, interim_results=True) - if audio_src: - mic_manager = SimulatedMicrophoneStream( - audio_src, sample_rate, int(sample_rate / 10)) - else: - mic_manager = ResumableMicrophoneStream( - sample_rate, int(sample_rate / 10)) + mic_manager = ResumableMicrophoneStream() - with mic_manager as stream: - resume = False - while True: - audio_generator = stream.generator(resume=resume) - requests = (types.StreamingRecognizeRequest(audio_content=content) - for content in audio_generator) + print('Say "Quit" or "Exit" to terminate the program.') - responses = client.streaming_recognize(streaming_config, requests) + with mic_manager as stream: + while not stream.closed: + audio_generator = stream.generator() + requests = (speech.types.StreamingRecognizeRequest( + audio_content=content) + for content in audio_generator) - try: - # Now, put the transcription responses to use. - listen_print_loop(responses, stream) - break - except (exceptions.OutOfRange, exceptions.InvalidArgument) as e: - if not ('maximum allowed stream duration' in e.message or - 'deadline too short' in e.message): - raise - print('Resuming..') - resume = True + responses = client.streaming_recognize(streaming_config, + requests) + # Now, put the transcription responses to use. + listen_print_loop(responses, stream) if __name__ == '__main__': - parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument('--rate', default=16000, help='Sample rate.', type=int) - parser.add_argument('--audio_src', help='File to simulate streaming of.') - args = parser.parse_args() - main(args.rate, args.audio_src) + main() +# [END speech_transcribe_infinite_streaming] From c8fb08904c5376a10018b74d2449f47a97bf7b7b Mon Sep 17 00:00:00 2001 From: Noah Negrey Date: Mon, 17 Dec 2018 16:14:12 -0800 Subject: [PATCH 6/8] Update transcribe_streaming_mic.py --- speech/cloud-client/transcribe_streaming_mic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/speech/cloud-client/transcribe_streaming_mic.py b/speech/cloud-client/transcribe_streaming_mic.py index 8c3c8529487a..3ca7b7094124 100644 --- a/speech/cloud-client/transcribe_streaming_mic.py +++ b/speech/cloud-client/transcribe_streaming_mic.py @@ -189,4 +189,4 @@ def main(): if __name__ == '__main__': main() -# [END speech_transcribe_streaming_mic] \ No newline at end of file +# [END speech_transcribe_streaming_mic] From 0b009fb4bbafe18053173bec03fd3aca12802ad1 Mon Sep 17 00:00:00 2001 From: nnegrey Date: Tue, 18 Dec 2018 16:16:06 -0800 Subject: [PATCH 7/8] Clean up unnecessary code, since we no longer wait for it to error out --- .../transcribe_streaming_indefinite.py | 49 +------------------ 1 file changed, 2 insertions(+), 47 deletions(-) diff --git a/speech/cloud-client/transcribe_streaming_indefinite.py b/speech/cloud-client/transcribe_streaming_indefinite.py index 04630638d023..24ee6ea87e43 100644 --- a/speech/cloud-client/transcribe_streaming_indefinite.py +++ b/speech/cloud-client/transcribe_streaming_indefinite.py @@ -28,7 +28,6 @@ # [START speech_transcribe_infinite_streaming] from __future__ import division -import collections import time import re import sys @@ -72,8 +71,6 @@ def __init__(self): self._bytes_per_chunk = (self._chunk_size * self._bytes_per_sample) self._chunks_per_second = ( self._bytes_per_second // self._bytes_per_chunk) - self._untranscribed = collections.deque( - maxlen=self._max_replay_secs * self._chunks_per_second) def __enter__(self): self.closed = False @@ -107,11 +104,7 @@ def _fill_buffer(self, in_data, *args, **kwargs): self._buff.put(in_data) return None, pyaudio.paContinue - def on_transcribe(self, end_time): - while self._untranscribed and end_time > self._untranscribed[0][1]: - self._untranscribed.popleft() - - def data_generator(self): + def generator(self): while not self.closed: if get_current_time() - self.start_time > STREAMING_LIMIT: self.start_time = get_current_time() @@ -136,42 +129,6 @@ def data_generator(self): yield b''.join(data) - def generator(self): - total_bytes_sent = 0 - - # Make a copy, in case on_transcribe is called while yielding them - catchup = list(self._untranscribed) - # Yield all the untranscribed chunks first - for chunk, _ in catchup: - yield chunk - - for byte_data in self.data_generator(): - # Populate the replay buffer of untranscribed audio bytes - total_bytes_sent += len(byte_data) - chunk_end_time = total_bytes_sent / self._bytes_per_second - self._untranscribed.append((byte_data, chunk_end_time)) - - yield byte_data - - -def _record_keeper(responses, stream): - """Calls the stream's on_transcribe callback for each final response. - - Args: - responses - a generator of responses. The responses must already be - filtered for ones with results and alternatives. - stream - a ResumableMicrophoneStream. - """ - for r in responses: - result = r.results[0] - if result.is_final: - top_alternative = result.alternatives[0] - # Keep track of what transcripts we've received, so we can resume - # intelligently when we hit the deadline - stream.on_transcribe(duration_to_secs( - top_alternative.words[-1].end_time)) - yield r - def listen_print_loop(responses, stream): """Iterates through server responses and prints them. @@ -188,11 +145,9 @@ def listen_print_loop(responses, stream): the next result to overwrite it, until the response is a final one. For the final one, print a newline to preserve the finalized transcription. """ - with_results = (r for r in responses if ( + responses = (r for r in responses if ( r.results and r.results[0].alternatives)) - responses = _record_keeper(with_results, stream) - num_chars_printed = 0 for response in responses: if not response.results: From f576f9e26d5ea67f794f5a51399b7cf7313fbe2b Mon Sep 17 00:00:00 2001 From: nnegrey Date: Tue, 18 Dec 2018 16:16:52 -0800 Subject: [PATCH 8/8] Update based on feedback --- speech/cloud-client/transcribe_streaming_indefinite.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/speech/cloud-client/transcribe_streaming_indefinite.py b/speech/cloud-client/transcribe_streaming_indefinite.py index 24ee6ea87e43..f1adb2247f13 100644 --- a/speech/cloud-client/transcribe_streaming_indefinite.py +++ b/speech/cloud-client/transcribe_streaming_indefinite.py @@ -53,9 +53,9 @@ def duration_to_secs(duration): class ResumableMicrophoneStream: """Opens a recording stream as a generator yielding the audio chunks.""" - def __init__(self): - self._rate = SAMPLE_RATE - self._chunk_size = CHUNK_SIZE + def __init__(self, rate, chunk_size): + self._rate = rate + self._chunk_size = chunk_size self._num_channels = 1 self._max_replay_secs = 5 @@ -201,7 +201,7 @@ def main(): config=config, interim_results=True) - mic_manager = ResumableMicrophoneStream() + mic_manager = ResumableMicrophoneStream(SAMPLE_RATE, CHUNK_SIZE) print('Say "Quit" or "Exit" to terminate the program.')