Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded rubberband #13143

Merged
merged 9 commits into from
Jun 8, 2024
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3245,6 +3245,9 @@ if(RUBBERBAND)
target_sources(mixxx-lib PRIVATE
src/effects/backends/builtin/pitchshifteffect.cpp
src/engine/bufferscalers/enginebufferscalerubberband.cpp
src/engine/bufferscalers/rubberbandwrapper.cpp
src/engine/bufferscalers/rubberbandtask.cpp
src/engine/bufferscalers/rubberbandworkerpool.cpp
)
endif()

Expand Down
9 changes: 9 additions & 0 deletions src/coreservices.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
#include "database/mixxxdb.h"
#include "effects/effectsmanager.h"
#include "engine/enginemixer.h"
#ifdef __RUBBERBAND__
#include "engine/bufferscalers/rubberbandworkerpool.h"
#endif
#include "library/coverartcache.h"
#include "library/library.h"
#include "library/library_prefs.h"
Expand Down Expand Up @@ -267,6 +270,9 @@ void CoreServices::initialize(QApplication* pApp) {
m_pEffectsManager.get(),
pChannelHandleFactory,
true);
#ifdef __RUBBERBAND__
RubberBandWorkerPool::createInstance(pConfig);
#endif

emit initializationProgressUpdate(30, tr("audio interface"));
// Although m_pSoundManager is created here, m_pSoundManager->setupDevices()
Expand Down Expand Up @@ -608,6 +614,9 @@ void CoreServices::finalize() {
// EngineMixer depends on Config and m_pEffectsManager.
qDebug() << t.elapsed(false).debugMillisWithUnit() << "deleting EngineMixer";
CLEAR_AND_CHECK_DELETED(m_pEngine);
#ifdef __RUBBERBAND__
RubberBandWorkerPool::destroy();
#endif

// Destroy PlayerInfo explicitly to release the track
// pointers of tracks that were still loaded in decks
Expand Down
91 changes: 43 additions & 48 deletions src/engine/bufferscalers/enginebufferscalerubberband.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
#include "util/counter.h"
#include "util/defs.h"
#include "util/math.h"
#include "util/mutex.h"
#include "util/sample.h"
#include "util/timer.h"

using RubberBand::RubberBandStretcher;

Expand Down Expand Up @@ -55,7 +57,7 @@ void EngineBufferScaleRubberBand::setScaleParameters(double base_rate,

if (pitchScale > 0) {
//qDebug() << "EngineBufferScaleRubberBand setPitchScale" << *pitch << pitchScale;
m_pRubberBand->setPitchScale(pitchScale);
m_rubberBand.setPitchScale(pitchScale);
}

// RubberBand handles checking for whether the change in timeRatio is a
Expand All @@ -65,20 +67,20 @@ void EngineBufferScaleRubberBand::setScaleParameters(double base_rate,
double timeRatioInverse = base_rate * speed_abs;
if (timeRatioInverse > 0) {
//qDebug() << "EngineBufferScaleRubberBand setTimeRatio" << 1 / timeRatioInverse;
m_pRubberBand->setTimeRatio(1.0 / timeRatioInverse);
m_rubberBand.setTimeRatio(1.0 / timeRatioInverse);
}

if (runningEngineVersion() == 2) {
if (m_pRubberBand->getInputIncrement() == 0) {
if (m_rubberBand.getInputIncrement() == 0) {
qWarning() << "EngineBufferScaleRubberBand inputIncrement is 0."
<< "On RubberBand <=1.8.1 a SIGFPE is imminent despite"
<< "our workaround. Taking evasive action."
<< "Please file an issue on https://github.com/mixxxdj/mixxx/issues";

// This is much slower than the minimum seek speed workaround above.
while (m_pRubberBand->getInputIncrement() == 0) {
while (m_rubberBand.getInputIncrement() == 0) {
timeRatioInverse += 0.001;
m_pRubberBand->setTimeRatio(1.0 / timeRatioInverse);
m_rubberBand.setTimeRatio(1.0 / timeRatioInverse);
}
speed_abs = timeRatioInverse / base_rate;
*pTempoRatio = m_bBackwards ? -speed_abs : speed_abs;
Expand All @@ -94,8 +96,8 @@ void EngineBufferScaleRubberBand::onSampleRateChanged() {
// TODO: Resetting the sample rate will cause internal
// memory allocations that may block the real-time thread.
// When is this function actually invoked??
m_rubberBand.clear();
if (!getOutputSignal().isValid()) {
m_pRubberBand.reset();
return;
}
RubberBandStretcher::Options rubberbandOptions =
Expand All @@ -110,19 +112,19 @@ void EngineBufferScaleRubberBand::onSampleRateChanged() {
}
#endif

m_pRubberBand = std::make_unique<RubberBandStretcher>(
m_rubberBand.setup(
getOutputSignal().getSampleRate(),
getOutputSignal().getChannelCount(),
rubberbandOptions);
// Setting the time ratio to a very high value will cause RubberBand
// to preallocate buffers large enough to (almost certainly)
// avoid memory reallocations during playback.
m_pRubberBand->setTimeRatio(2.0);
m_pRubberBand->setTimeRatio(1.0);
m_rubberBand.setTimeRatio(2.0);
m_rubberBand.setTimeRatio(1.0);
}

void EngineBufferScaleRubberBand::clear() {
VERIFY_OR_DEBUG_ASSERT(m_pRubberBand) {
VERIFY_OR_DEBUG_ASSERT(m_rubberBand.isValid()) {
return;
}
reset();
Expand All @@ -131,14 +133,18 @@ void EngineBufferScaleRubberBand::clear() {
SINT EngineBufferScaleRubberBand::retrieveAndDeinterleave(
CSAMPLE* pBuffer,
SINT frames) {
const SINT frames_available = m_pRubberBand->available();
VERIFY_OR_DEBUG_ASSERT(m_rubberBand.isValid()) {
return 0;
}
// NOTE: If we still need to throw away padding, then we can also
// immediately read those frames in addition to the frames we actually
// need for the output
const SINT frames_to_read = math_min(frames_available, frames + m_remainingPaddingInOutput);
DEBUG_ASSERT(frames_to_read <= m_buffers[0].size());
SINT received_frames = static_cast<SINT>(m_pRubberBand->retrieve(
m_bufferPtrs.data(), frames_to_read));
SINT received_frames;
{
ScopedTimer t(QStringLiteral("RubberBand::retrieve"));
received_frames = static_cast<SINT>(m_rubberBand.retrieve(
m_bufferPtrs.data(), frames + m_remainingPaddingInOutput, m_buffers[0].size()));
}
SINT frame_offset = 0;

// As explained below in `reset()`, the first time this is called we need to
Expand All @@ -164,6 +170,9 @@ SINT EngineBufferScaleRubberBand::retrieveAndDeinterleave(
void EngineBufferScaleRubberBand::deinterleaveAndProcess(
const CSAMPLE* pBuffer,
SINT frames) {
VERIFY_OR_DEBUG_ASSERT(m_rubberBand.isValid()) {
return;
}
DEBUG_ASSERT(frames <= static_cast<SINT>(m_buffers[0].size()));

SampleUtil::deinterleaveBuffer(
Expand All @@ -172,14 +181,21 @@ void EngineBufferScaleRubberBand::deinterleaveAndProcess(
pBuffer,
frames);

m_pRubberBand->process(m_bufferPtrs.data(),
frames,
false);
{
ScopedTimer t(QStringLiteral("RubberBand::process"));
m_rubberBand.process(m_bufferPtrs.data(),
frames,
false);
}
}

double EngineBufferScaleRubberBand::scaleBuffer(
CSAMPLE* pOutputBuffer,
SINT iOutputBufferSize) {
VERIFY_OR_DEBUG_ASSERT(m_rubberBand.isValid()) {
return 0.0;
}
ScopedTimer t(QStringLiteral("EngineBufferScaleRubberBand::scaleBuffer"));
if (m_dBaseRate == 0.0 || m_dTempoRatio == 0.0) {
SampleUtil::clear(pOutputBuffer, iOutputBufferSize);
// No actual samples/frames have been read from the
Expand All @@ -206,7 +222,7 @@ double EngineBufferScaleRubberBand::scaleBuffer(
read += getOutputSignal().frames2samples(received_frames);

const SINT next_block_frames_required =
static_cast<SINT>(m_pRubberBand->getSamplesRequired());
static_cast<SINT>(m_rubberBand.getSamplesRequired());
if (remaining_frames > 0 && next_block_frames_required > 0) {
// The requested setting becomes effective after all previous frames have been processed
m_effectiveRate = m_dBaseRate * m_dTempoRatio;
Expand Down Expand Up @@ -266,44 +282,20 @@ void EngineBufferScaleRubberBand::useEngineFiner(bool enable) {
}
}

// See
// https://github.com/breakfastquay/rubberband/commit/72654b04ea4f0707e214377515119e933efbdd6c
// for how these two functions were implemented within librubberband itself
size_t EngineBufferScaleRubberBand::getPreferredStartPad() const {
#if RUBBERBANDV3
return m_pRubberBand->getPreferredStartPad();
#else
// `getPreferredStartPad()` returns `window_size / 2`, while with
// `getLatency()` both time stretching engines return `window_size / 2 /
// pitch_scale`
return static_cast<size_t>(std::ceil(
m_pRubberBand->getLatency() * m_pRubberBand->getPitchScale()));
#endif
return m_rubberBand.getPreferredStartPad();
}

size_t EngineBufferScaleRubberBand::getStartDelay() const {
#if RUBBERBANDV3
return m_pRubberBand->getStartDelay();
#else
// In newer Rubber Band versions `getLatency()` is a deprecated alias for
// `getStartDelay()`, so they should behave the same. In the commit linked
// above the behavior was different for the R3 stretcher, but that was only
// during the initial betas of Rubberband 3.0 so we shouldn't have to worry
// about that.
return m_pRubberBand->getLatency();
#endif
return m_rubberBand.getStartDelay();
}

int EngineBufferScaleRubberBand::runningEngineVersion() {
#if RUBBERBANDV3
return m_pRubberBand->getEngineVersion();
#else
return 2;
#endif
return m_rubberBand.getEngineVersion();
}

void EngineBufferScaleRubberBand::reset() {
m_pRubberBand->reset();
m_rubberBand.reset();

// As mentioned in the docs (https://breakfastquay.com/rubberband/code-doc/)
// and FAQ (https://breakfastquay.com/rubberband/integration.html#faqs), you
Expand All @@ -319,7 +311,10 @@ void EngineBufferScaleRubberBand::reset() {
std::fill_n(m_buffers[1].span().begin(), block_size, 0.0f);
while (remaining_padding > 0) {
const size_t pad_samples = std::min<size_t>(remaining_padding, block_size);
m_pRubberBand->process(m_bufferPtrs.data(), pad_samples, false);
{
ScopedTimer t(QStringLiteral("RubberBand::process"));
m_rubberBand.process(m_bufferPtrs.data(), pad_samples, false);
}

remaining_padding -= pad_samples;
}
Expand Down
3 changes: 2 additions & 1 deletion src/engine/bufferscalers/enginebufferscalerubberband.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <memory>

#include "engine/bufferscalers/enginebufferscale.h"
#include "engine/bufferscalers/rubberbandwrapper.h"
#include "util/samplebuffer.h"

class ReadAheadManager;
Expand Down Expand Up @@ -62,7 +63,7 @@ class EngineBufferScaleRubberBand final : public EngineBufferScale {
// The read-ahead manager that we use to fetch samples
ReadAheadManager* m_pReadAheadManager;

std::unique_ptr<RubberBand::RubberBandStretcher> m_pRubberBand;
RubberBandWrapper m_rubberBand;

/// The audio buffers samples used to send audio to Rubber Band and to
/// receive processed audio from Rubber Band. This is needed because Mixxx
Expand Down
42 changes: 42 additions & 0 deletions src/engine/bufferscalers/rubberbandtask.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include "engine/bufferscalers/rubberbandtask.h"

#include "engine/engine.h"
#include "util/assert.h"
#include "util/compatibility/qmutex.h"

RubberBandTask::RubberBandTask(
size_t sampleRate, size_t channels, Options options)
: RubberBand::RubberBandStretcher(sampleRate, channels, options),
QRunnable(),
m_completedSema(0),
m_input(nullptr),
m_samples(0),
m_isFinal(false) {
setAutoDelete(false);
}

void RubberBandTask::set(const float* const* input,
size_t samples,
bool isFinal) {
DEBUG_ASSERT(m_completedSema.available() == 0);
m_input = input;
m_samples = samples;
m_isFinal = isFinal;
}

void RubberBandTask::waitReady() {
VERIFY_OR_DEBUG_ASSERT(m_input && m_samples) {
return;
};
m_completedSema.acquire();
}

void RubberBandTask::run() {
VERIFY_OR_DEBUG_ASSERT(m_completedSema.available() == 0 && m_input && m_samples) {
return;
};
process(m_input,
m_samples,
m_isFinal);
m_completedSema.release();
}
41 changes: 41 additions & 0 deletions src/engine/bufferscalers/rubberbandtask.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <rubberband/RubberBandStretcher.h>

#include <QRunnable>
#include <QSemaphore>
#include <atomic>

#include "audio/types.h"

using RubberBand::RubberBandStretcher;

class RubberBandTask : public RubberBandStretcher, public QRunnable {
public:
RubberBandTask(size_t sampleRate,
size_t channels,
Options options = DefaultOptions);

/// @brief Submit a new stretching task
/// @param stretcher the RubberBand::RubberBandStretcher instance to use.
/// Must remain valid till waitReady() as returned
/// @param input The samples buffer
/// @param samples the samples count
/// @param final whether or not this is the final buffer
acolombier marked this conversation as resolved.
Show resolved Hide resolved
void set(const float* const* input,
size_t samples,
bool isFinal);

// Wait for the current task to complete.
void waitReady();

void run();

private:
// Whether or not the scheduled job as completed
QSemaphore m_completedSema;

const float* const* m_input;
size_t m_samples;
bool m_isFinal;
};
29 changes: 29 additions & 0 deletions src/engine/bufferscalers/rubberbandworkerpool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "engine/bufferscalers/rubberbandworkerpool.h"

#include <rubberband/RubberBandStretcher.h>

#include "engine/engine.h"
#include "util/assert.h"

RubberBandWorkerPool::RubberBandWorkerPool(UserSettingsPointer pConfig)
: QThreadPool() {
bool multiThreadedOnStereo = pConfig &&
pConfig->getValue(ConfigKey(QStringLiteral("[App]"),
QStringLiteral("keylock_multithreading")),
false);
m_channelPerWorker = multiThreadedOnStereo
? mixxx::audio::ChannelCount::mono()
: mixxx::audio::ChannelCount::stereo();
DEBUG_ASSERT(mixxx::kEngineChannelCount % m_channelPerWorker == 0);

setThreadPriority(QThread::HighPriority);
setMaxThreadCount(mixxx::kEngineChannelCount / m_channelPerWorker - 1);

// We allocate one runner less than the total of maximum supported channel,
// so the engine thread will also perform a stretching operation, instead of
// waiting all workers to complete. During performance testing, this ahas
// show better results
for (int w = 0; w < maxThreadCount(); w++) {
reserveThread();
}
}
Loading