-
Notifications
You must be signed in to change notification settings - Fork 500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
i2s audio examples #916
i2s audio examples #916
Conversation
Nice work, if we can bash the rough edges off then WavPlayer or "audio.py" can be baked into the MicroPython builds to avoid needing to haul it around with the examples. There are some gotchas outstanding though - These examples really like to explode with a burst of random-memory-played-as-audio when I hit "Stop" in Thonny. I can't seem to figure out how to fix that. Also the POP sound when an audio file stops playing shouldn't happen, afaik, the transition to "silence" should be pretty much instant, and appending silence to a wav file causes it to... not pop, as you might expect. I tried heavily modifying audio.py to remove the silence buffer and use the playback buffer instead, queuing up the wav file and appending silence if there's a buffer underrun- # SPDX-FileCopyrightText: 2023 Christopher Parrott for Pimoroni Ltd
#
# SPDX-License-Identifier: MIT
import os
import math
import struct
from machine import I2S, Pin
"""
A class for playing Wav files out of an I2S audio amp. It can also play pure tones.
This code is based heavily on the work of Mike Teachman, at:
https://github.com/miketeachman/micropython-i2s-examples/blob/master/examples/wavplayer.py
"""
class WavPlayer:
# Internal states
PLAY = 0
PAUSE = 1
FLUSH = 2
STOP = 3
NONE = 4
MODE_WAV = 0
MODE_TONE = 1
# Default buffer length
WAV_BUFFER_LENGTH = 10000
INTERNAL_BUFFER_LENGTH = 10000
TONE_SAMPLE_RATE = 44_100
TONE_BITS_PER_SAMPLE = 16
TONE_FULL_WAVES = 2
def __init__(self, id, sck_pin, ws_pin, sd_pin, amp_enable=None, ibuf_len=INTERNAL_BUFFER_LENGTH, root="/"):
self.__id = id
self.__sck_pin = sck_pin
self.__ws_pin = ws_pin
self.__sd_pin = sd_pin
self.__ibuf_len = ibuf_len
self.__enable = None
if amp_enable is not None:
self.__enable = Pin(amp_enable, Pin.OUT)
# Set the directory to search for files in
self.set_root(root)
self.__state = WavPlayer.NONE
self.__mode = WavPlayer.MODE_WAV
self.__wav_file = None
self.__loop_wav = False
self.__first_sample_offset = None
self.__flush_count = 0
self.__audio_out = None
# Allocate a larger array for WAV audio samples, using a memoryview for more efficient access
self.__wav_samples_mv = memoryview(bytearray(self.WAV_BUFFER_LENGTH))
# Reserve a variable for audio samples used for tones
self.__tone_samples = None
self.__queued_samples = None
def set_root(self, root):
self.__root = root.rstrip("/") + "/"
def play_wav(self, wav_file, loop=False):
if os.listdir(self.__root).count(wav_file) == 0:
raise ValueError(f"'{wav_file}' not found")
self.__stop_i2s() # Stop any active playback and terminate the I2S instance
self.__wav_file = open(self.__root + wav_file, "rb") # Open the chosen WAV file in read-only, binary mode
self.__loop_wav = loop # Record if the user wants the file to loop
# Parse the WAV file, returning the necessary parameters to initialise I2S communication
format, sample_rate, bits_per_sample, self.__first_sample_offset = WavPlayer.__parse_wav(self.__wav_file)
self.__wav_file.seek(self.__first_sample_offset) # Advance to first byte of sample data
self.__start_i2s(bits=bits_per_sample,
format=format,
rate=sample_rate,
state=WavPlayer.PLAY,
mode=WavPlayer.MODE_WAV)
def play_tone(self, frequency, amplitude):
if frequency < 20.0 or frequency > 20_000:
raise ValueError("frequency out of range. Expected between 20Hz and 20KHz")
if amplitude < 0.0 or amplitude > 1.0:
raise ValueError("amplitude out of range. Expected 0.0 to 1.0")
# Create a buffer containing the pure tone samples
samples_per_cycle = self.TONE_SAMPLE_RATE // frequency
sample_size_in_bytes = self.TONE_BITS_PER_SAMPLE // 8
samples = bytearray(self.TONE_FULL_WAVES * samples_per_cycle * sample_size_in_bytes)
range = pow(2, self.TONE_BITS_PER_SAMPLE) // 2
format = "<h" if self.TONE_BITS_PER_SAMPLE == 16 else "<l"
# Populate the buffer with multiple cycles to avoid it completing too quickly and causing drop outs
for i in range(samples_per_cycle * self.TONE_FULL_WAVES):
sample = int((range - 1) * (math.sin(2 * math.pi * i / samples_per_cycle)) * amplitude)
struct.pack_into(format, samples, i * sample_size_in_bytes, sample)
# Are we not already playing tones?
if not (self.__mode == WavPlayer.MODE_TONE and (self.__state == WavPlayer.PLAY or self.__state == WavPlayer.PAUSE)):
self.__stop_i2s() # Stop any active playback and terminate the I2S instance
self.__tone_samples = samples
self.__start_i2s(bits=self.TONE_BITS_PER_SAMPLE,
format=I2S.MONO,
rate=self.TONE_SAMPLE_RATE,
state=WavPlayer.PLAY,
mode=WavPlayer.MODE_TONE)
else:
self.__queued_samples = samples
self.__state = WavPlayer.PLAY
def pause(self):
if self.__state == WavPlayer.PLAY:
self.__state = WavPlayer.PAUSE # Enter the pause state on the next callback
def resume(self):
if self.__state == WavPlayer.PAUSE:
self.__state = WavPlayer.PLAY # Enter the play state on the next callback
def stop(self):
if self.__state == WavPlayer.PLAY or self.__state == WavPlayer.PAUSE:
if self.__mode == WavPlayer.MODE_WAV:
self.__wav_file.close()
else:
self.__state = WavPlayer.STOP
def is_playing(self):
return self.__state != WavPlayer.NONE and self.__state != WavPlayer.STOP
def is_paused(self):
return self.__state == WavPlayer.PAUSE
def __start_i2s(self, bits=16, format=I2S.MONO, rate=44_100, state=STOP, mode=MODE_WAV):
import gc
gc.collect()
self.__audio_out = I2S(
self.__id,
sck=self.__sck_pin,
ws=self.__ws_pin,
sd=self.__sd_pin,
mode=I2S.TX,
bits=bits,
format=format,
rate=rate,
ibuf=self.__ibuf_len,
)
print(f"Bits per sample: {bits}, format: {format}, rate: {rate}")
self.__state = state
self.__mode = mode
self.__audio_out.irq(self.__i2s_callback)
self.__silence()
if self.__enable is not None:
self.__enable.on()
def __stop_i2s(self):
self.stop() # Stop any active playback
while self.is_playing(): # and wait for it to complete
pass
if self.__enable is not None:
self.__enable.off()
if self.__audio_out is not None:
self.__audio_out.deinit() # Deinit any active I2S comms
self.__state == WavPlayer.NONE # Return to the none state
def __silence(self):
# Generate silence, we probably just want to do this once
for n in range(1024):
self.__wav_samples_mv[n] = 0
self.__audio_out.write(self.__wav_samples_mv[:1024]) # Play silence
def __i2s_callback(self, arg):
# PLAY
if self.__state == WavPlayer.PLAY:
if self.__mode == WavPlayer.MODE_WAV:
try:
num_read = self.__wav_file.readinto(self.__wav_samples_mv) # Read the next section of the WAV file
except ValueError:
self.__state = WavPlayer.STOP
self.__silence()
return
# End of file, be it 0 or < target buffer
# Or possibly a wav file with fewer samples than the output buffer size
if num_read < self.__ibuf_len:
# If we're looping, seek to the start of the file and append the
# samples into the playback buffer-
if self.__loop_wav:
_ = self.__wav_file.seek(self.__first_sample_offset) # Play again, so advance to first byte of sample data
self.__wav_file.readinto(self.__wav_samples_mv[num_read:])
self.__audio_out.write(self.__wav_samples_mv)
return
# ^ Early return for looped layback
# v Fill full buffer for single-shot playback
extra_bytes_needed = self.__ibuf_len - num_read
for n in range(extra_bytes_needed):
self.__wav_samples_mv[num_read + n] = 0
self.__audio_out.write(self.__wav_samples_mv)
self.__wav_file.close() # Stop playing, so close the file
self.__state = WavPlayer.STOP # and enter the flush state on the next callback # Read 0 bytes, so we're clean off the end of the wav buffer
else:
self.__audio_out.write(self.__wav_samples_mv) # Write out our full buffer
else:
if self.__queued_samples is not None:
self.__tone_samples = self.__queued_samples
self.__queued_samples = None
self.__audio_out.write(self.__tone_samples)
# PAUSE or STOP
elif self.__state == WavPlayer.PAUSE or self.__state == WavPlayer.STOP:
self.__silence()
# NONE
elif self.__state == WavPlayer.NONE:
pass
@staticmethod
def __parse_wav(wav_file):
chunk_ID = wav_file.read(4)
if chunk_ID != b"RIFF":
raise ValueError("WAV chunk ID invalid")
_ = wav_file.read(4) # chunk_size
format = wav_file.read(4)
if format != b"WAVE":
raise ValueError("WAV format invalid")
sub_chunk1_ID = wav_file.read(4)
if sub_chunk1_ID != b"fmt ":
raise ValueError("WAV sub chunk 1 ID invalid")
_ = wav_file.read(4) # sub_chunk1_size
_ = struct.unpack("<H", wav_file.read(2))[0] # audio_format
num_channels = struct.unpack("<H", wav_file.read(2))[0]
if num_channels == 1:
format = I2S.MONO
else:
format = I2S.STEREO
sample_rate = struct.unpack("<I", wav_file.read(4))[0]
# if sample_rate != 44_100 and sample_rate != 48_000:
# raise ValueError(f"WAV sample rate of {sample_rate} invalid. Only 44.1KHz or 48KHz audio are supported")
_ = struct.unpack("<I", wav_file.read(4))[0] # byte_rate
_ = struct.unpack("<H", wav_file.read(2))[0] # block_align
bits_per_sample = struct.unpack("<H", wav_file.read(2))[0]
# usually the sub chunk2 ID ("data") comes next, but
# some online MP3->WAV converters add
# binary data before "data". So, read a fairly large
# block of bytes and search for "data".
binary_block = wav_file.read(200)
offset = binary_block.find(b"data")
if offset == -1:
raise ValueError("WAV sub chunk 2 ID not found")
return (format, sample_rate, bits_per_sample, 44 + offset)
def __del__(self):
if self.__enable is not None:
self.__enable.off()
if self.__audio_out is not None:
self.__audio_out.deinit() # Deinit any active I2S comms This, however, does not fix the pop. And there's even a pop between loops if I try looped playback. That's particularly surprising, since appending silence to the existing playback buffer should not be any different to that silence coming from the wav file in the first place... Intuitively I would expect this problem to be a signedness mismatch between the silence and the audio file, but that does not seem to be the case. Appending silence to the audio works just fine and then inspecting that silence in a hex editor reveals the string of zeros it should be (though notably if you have 32bit float selected in Audacity you'll actually get a low amplitude, high frequency noise pattern due to quantisation error or something when exporting to s16.) Finally just to be super, super weird I'm pretty just the first playback of doorbell.wav is normal speed, and subsequent playbacks are slightly sped up. Wut? |
Here's how a Wav handling class might look if it kept track of bytes remaining and skipped over the metadata at the end- # SPDX-FileCopyrightText: 2023 Christopher Parrott for Pimoroni Ltd
#
# SPDX-License-Identifier: MIT
import os
import math
import struct
from machine import I2S, Pin
"""
A class for playing Wav files out of an I2S audio amp. It can also play pure tones.
This code is based heavily on the work of Mike Teachman, at:
https://github.com/miketeachman/micropython-i2s-examples/blob/master/examples/wavplayer.py
"""
class WavPlayer:
# Internal states
PLAY = 0
PAUSE = 1
FLUSH = 2
STOP = 3
NONE = 4
MODE_WAV = 0
MODE_TONE = 1
# Default buffer length
WAV_BUFFER_LENGTH = 10000
INTERNAL_BUFFER_LENGTH = 10000
TONE_SAMPLE_RATE = 44_100
TONE_BITS_PER_SAMPLE = 16
TONE_FULL_WAVES = 2
def __init__(self, id, sck_pin, ws_pin, sd_pin, amp_enable=None, ibuf_len=INTERNAL_BUFFER_LENGTH, root="/"):
self.__id = id
self.__sck_pin = sck_pin
self.__ws_pin = ws_pin
self.__sd_pin = sd_pin
self.__ibuf_len = ibuf_len
self.__enable = None
if amp_enable is not None:
self.__enable = Pin(amp_enable, Pin.OUT)
# Set the directory to search for files in
self.set_root(root)
self.__state = WavPlayer.NONE
self.__mode = WavPlayer.MODE_WAV
self.__wav_file = None
self.__loop_wav = False
self.__first_sample_offset = None
self.__flush_count = 0
self.__audio_out = None
# Allocate a larger array for WAV audio samples, using a memoryview for more efficient access
self.__wav_samples_mv = memoryview(bytearray(self.WAV_BUFFER_LENGTH))
# Reserve a variable for audio samples used for tones
self.__tone_samples = None
self.__queued_samples = None
def set_root(self, root):
self.__root = root.rstrip("/") + "/"
def play_wav(self, wav_file, loop=False):
if os.listdir(self.__root).count(wav_file) == 0:
raise ValueError(f"'{wav_file}' not found")
self.__stop_i2s() # Stop any active playback and terminate the I2S instance
self.__wav_file = open(self.__root + wav_file, "rb") # Open the chosen WAV file in read-only, binary mode
self.__loop_wav = loop # Record if the user wants the file to loop
# Parse the WAV file, returning the necessary parameters to initialise I2S communication
format, sample_rate, bits_per_sample, self.__first_sample_offset = WavPlayer.__parse_wav(self.__wav_file)
self.__wav_file.seek(self.__first_sample_offset) # Advance to first byte of sample data
self.__start_i2s(bits=bits_per_sample,
format=format,
rate=sample_rate,
state=WavPlayer.PLAY,
mode=WavPlayer.MODE_WAV)
def play_tone(self, frequency, amplitude):
if frequency < 20.0 or frequency > 20_000:
raise ValueError("frequency out of range. Expected between 20Hz and 20KHz")
if amplitude < 0.0 or amplitude > 1.0:
raise ValueError("amplitude out of range. Expected 0.0 to 1.0")
# Create a buffer containing the pure tone samples
samples_per_cycle = self.TONE_SAMPLE_RATE // frequency
sample_size_in_bytes = self.TONE_BITS_PER_SAMPLE // 8
samples = bytearray(self.TONE_FULL_WAVES * samples_per_cycle * sample_size_in_bytes)
range = pow(2, self.TONE_BITS_PER_SAMPLE) // 2
format = "<h" if self.TONE_BITS_PER_SAMPLE == 16 else "<l"
# Populate the buffer with multiple cycles to avoid it completing too quickly and causing drop outs
for i in range(samples_per_cycle * self.TONE_FULL_WAVES):
sample = int((range - 1) * (math.sin(2 * math.pi * i / samples_per_cycle)) * amplitude)
struct.pack_into(format, samples, i * sample_size_in_bytes, sample)
# Are we not already playing tones?
if not (self.__mode == WavPlayer.MODE_TONE and (self.__state == WavPlayer.PLAY or self.__state == WavPlayer.PAUSE)):
self.__stop_i2s() # Stop any active playback and terminate the I2S instance
self.__tone_samples = samples
self.__start_i2s(bits=self.TONE_BITS_PER_SAMPLE,
format=I2S.MONO,
rate=self.TONE_SAMPLE_RATE,
state=WavPlayer.PLAY,
mode=WavPlayer.MODE_TONE)
else:
self.__queued_samples = samples
self.__state = WavPlayer.PLAY
def pause(self):
if self.__state == WavPlayer.PLAY:
self.__state = WavPlayer.PAUSE # Enter the pause state on the next callback
def resume(self):
if self.__state == WavPlayer.PAUSE:
self.__state = WavPlayer.PLAY # Enter the play state on the next callback
def stop(self):
if self.__state == WavPlayer.PLAY or self.__state == WavPlayer.PAUSE:
if self.__mode == WavPlayer.MODE_WAV:
self.__wav_file.close()
else:
self.__state = WavPlayer.STOP
def is_playing(self):
return self.__state != WavPlayer.NONE and self.__state != WavPlayer.STOP
def is_paused(self):
return self.__state == WavPlayer.PAUSE
def __start_i2s(self, bits=16, format=I2S.MONO, rate=44_100, state=STOP, mode=MODE_WAV):
import gc
gc.collect()
self.__audio_out = I2S(
self.__id,
sck=self.__sck_pin,
ws=self.__ws_pin,
sd=self.__sd_pin,
mode=I2S.TX,
bits=bits,
format=format,
rate=rate,
ibuf=self.__ibuf_len,
)
print(f"Bits per sample: {bits}, format: {format}, rate: {rate}")
self.__state = state
self.__mode = mode
self.__audio_out.irq(self.__i2s_callback)
self.__silence()
if self.__enable is not None:
self.__enable.on()
def __stop_i2s(self):
self.stop() # Stop any active playback
while self.is_playing(): # and wait for it to complete
pass
if self.__enable is not None:
self.__enable.off()
if self.__audio_out is not None:
self.__audio_out.deinit() # Deinit any active I2S comms
self.__state == WavPlayer.NONE # Return to the none state
def __silence(self):
# Generate silence, we probably just want to do this once
for n in range(1024):
self.__wav_samples_mv[n] = 0
self.__audio_out.write(self.__wav_samples_mv[:1024]) # Play silence
def __i2s_callback(self, arg):
# PLAY
if self.__state == WavPlayer.PLAY:
if self.__mode == WavPlayer.MODE_WAV:
try:
num_read = self.__wav_file.readinto(self.__wav_samples_mv) # Read the next section of the WAV file
except ValueError:
self.__state = WavPlayer.STOP
self.__silence()
return
# End of file, be it 0 or < target buffer
# Or possibly a wav file with fewer samples than the output buffer size
if num_read < self.__ibuf_len:
# If we're looping, seek to the start of the file and append the
# samples into the playback buffer-
if self.__loop_wav:
_ = self.__wav_file.seek(self.__first_sample_offset) # Play again, so advance to first byte of sample data
self.__wav_file.readinto(self.__wav_samples_mv[num_read:])
self.__audio_out.write(self.__wav_samples_mv)
return
# ^ Early return for looped layback
# v Fill full buffer for single-shot playback
extra_bytes_needed = self.__ibuf_len - num_read
for n in range(extra_bytes_needed):
self.__wav_samples_mv[num_read + n] = 0
self.__audio_out.write(self.__wav_samples_mv)
self.__wav_file.close() # Stop playing, so close the file
self.__state = WavPlayer.STOP # and enter the flush state on the next callback # Read 0 bytes, so we're clean off the end of the wav buffer
else:
self.__audio_out.write(self.__wav_samples_mv) # Write out our full buffer
else:
if self.__queued_samples is not None:
self.__tone_samples = self.__queued_samples
self.__queued_samples = None
self.__audio_out.write(self.__tone_samples)
# PAUSE or STOP
elif self.__state == WavPlayer.PAUSE or self.__state == WavPlayer.STOP:
self.__silence()
# NONE
elif self.__state == WavPlayer.NONE:
pass
@staticmethod
def __parse_wav(wav_file):
chunk_ID = wav_file.read(4)
if chunk_ID != b"RIFF":
raise ValueError("WAV chunk ID invalid")
_ = wav_file.read(4) # chunk_size
format = wav_file.read(4)
if format != b"WAVE":
raise ValueError("WAV format invalid")
sub_chunk1_ID = wav_file.read(4)
if sub_chunk1_ID != b"fmt ":
raise ValueError("WAV sub chunk 1 ID invalid")
_ = wav_file.read(4) # sub_chunk1_size
_ = struct.unpack("<H", wav_file.read(2))[0] # audio_format
num_channels = struct.unpack("<H", wav_file.read(2))[0]
if num_channels == 1:
format = I2S.MONO
else:
format = I2S.STEREO
sample_rate = struct.unpack("<I", wav_file.read(4))[0]
# if sample_rate != 44_100 and sample_rate != 48_000:
# raise ValueError(f"WAV sample rate of {sample_rate} invalid. Only 44.1KHz or 48KHz audio are supported")
_ = struct.unpack("<I", wav_file.read(4))[0] # byte_rate
_ = struct.unpack("<H", wav_file.read(2))[0] # block_align
bits_per_sample = struct.unpack("<H", wav_file.read(2))[0]
# usually the sub chunk2 ID ("data") comes next, but
# some online MP3->WAV converters add
# binary data before "data". So, read a fairly large
# block of bytes and search for "data".
binary_block = wav_file.read(200)
offset = binary_block.find(b"data")
if offset == -1:
raise ValueError("WAV sub chunk 2 ID not found")
return (format, sample_rate, bits_per_sample, 44 + offset)
def __del__(self):
if self.__enable is not None:
self.__enable.off()
if self.__audio_out is not None:
self.__audio_out.deinit() # Deinit any active I2S comms
class WavFile:
HEADER_SIZE = 36
def __init__(self, path):
self._file = open(path, "rb")
self._data_offset = HEADER_SIZE
header = self._file.read(HEADER_SIZE)
It would be cool if Tone could be implemented this same way- just a class you set up and it generates an appropriate stream of samples. This would empower users to follow that pattern and make their own audio-generating classes... ala a pure Python version of our Synth for example 😆 I am loathe to proceed any further with this since I got nerd sniped by the audio pop and got carried away. It's your baby! |
Could we have an MicroPython example for Pico Audio Pack too? 👀 |
I don't recall if we quite tackled all the bugs here, but I've merged these with the expectation that they'll get more visibility in the main codebase than here. Thanks all! |
Adding i2s audio examples for Cosmic, Galactic and Stellar Unicorn.