From 406fc3aacbd0d5a4c7949f2b78ecd35956ed57df Mon Sep 17 00:00:00 2001 From: Antoine C Date: Fri, 31 May 2024 20:02:24 +0100 Subject: [PATCH] Few nits and using QThreadPool/QRunnable --- CMakeLists.txt | 2 +- src/engine/bufferscalers/rubberbandtask.cpp | 48 ++++++++++++++++ src/engine/bufferscalers/rubberbandtask.h | 45 +++++++++++++++ src/engine/bufferscalers/rubberbandworker.cpp | 56 ------------------- src/engine/bufferscalers/rubberbandworker.h | 49 ---------------- .../bufferscalers/rubberbandworkerpool.cpp | 34 +++-------- .../bufferscalers/rubberbandworkerpool.h | 24 ++------ .../bufferscalers/rubberbandwrapper.cpp | 28 ++++------ src/engine/bufferscalers/rubberbandwrapper.h | 8 +-- src/test/signalpathtest.h | 8 +-- 10 files changed, 126 insertions(+), 176 deletions(-) create mode 100644 src/engine/bufferscalers/rubberbandtask.cpp create mode 100644 src/engine/bufferscalers/rubberbandtask.h delete mode 100644 src/engine/bufferscalers/rubberbandworker.cpp delete mode 100644 src/engine/bufferscalers/rubberbandworker.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 6913fb3402a..bb5582838c9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3246,7 +3246,7 @@ if(RUBBERBAND) src/effects/backends/builtin/pitchshifteffect.cpp src/engine/bufferscalers/enginebufferscalerubberband.cpp src/engine/bufferscalers/rubberbandwrapper.cpp - src/engine/bufferscalers/rubberbandworker.cpp + src/engine/bufferscalers/rubberbandtask.cpp src/engine/bufferscalers/rubberbandworkerpool.cpp ) endif() diff --git a/src/engine/bufferscalers/rubberbandtask.cpp b/src/engine/bufferscalers/rubberbandtask.cpp new file mode 100644 index 00000000000..76badc0a7c7 --- /dev/null +++ b/src/engine/bufferscalers/rubberbandtask.cpp @@ -0,0 +1,48 @@ +#include "engine/bufferscalers/rubberbandtask.h" + +#include "engine/engine.h" +#include "util/assert.h" +#include "util/compatibility/qmutex.h" + +RubberBandTask::RubberBandTask( + size_t sampleRate, size_t channels, Options options) + : RubberBand::RubberBandStretcher(sampleRate, channels, options), + QRunnable(), + m_completed(false), + m_input(nullptr), + m_samples(0), + m_isFinal(false) { + setAutoDelete(false); +} + +void RubberBandTask::set(const float* const* input, + size_t samples, + bool isFinal) { + auto locker = lockMutex(&m_waitLock); + m_completed = false; + m_input = input; + m_samples = samples; + m_isFinal = isFinal; +} + +void RubberBandTask::waitReady() { + auto locker = lockMutex(&m_waitLock); + VERIFY_OR_DEBUG_ASSERT(m_input && m_samples) { + return; + }; + while (!m_completed) { + m_waitCondition.wait(&m_waitLock); + } +} + +void RubberBandTask::run() { + auto locker = lockMutex(&m_waitLock); + VERIFY_OR_DEBUG_ASSERT(!m_completed && m_input && m_samples) { + return; + }; + process(m_input, + m_samples, + m_isFinal); + m_completed = true; + m_waitCondition.wakeOne(); +} diff --git a/src/engine/bufferscalers/rubberbandtask.h b/src/engine/bufferscalers/rubberbandtask.h new file mode 100644 index 00000000000..a127fb99251 --- /dev/null +++ b/src/engine/bufferscalers/rubberbandtask.h @@ -0,0 +1,45 @@ +#pragma once + +#include + +#include +#include +#include +#include + +#include "audio/types.h" + +using RubberBand::RubberBandStretcher; + +class RubberBandTask : public RubberBandStretcher, public QRunnable { + public: + RubberBandTask(size_t sampleRate, + size_t channels, + Options options = DefaultOptions); + + /// @brief Submit a new stretching task + /// @param stretcher the RubberBand::RubberBandStretcher instance to use. + /// Must remain valid till waitReady() as returned + /// @param input The samples buffer + /// @param samples the samples count + /// @param final whether or not this is the final buffer + void set(const float* const* input, + size_t samples, + bool isFinal); + + // Wait for the current task to complete. + void waitReady(); + + void run(); + + private: + // Used to schedule the thread + QMutex m_waitLock; + QWaitCondition m_waitCondition; + // Whether or not the scheduled job as completed + bool m_completed; + + const float* const* m_input; + size_t m_samples; + bool m_isFinal; +}; diff --git a/src/engine/bufferscalers/rubberbandworker.cpp b/src/engine/bufferscalers/rubberbandworker.cpp deleted file mode 100644 index 8e6669d61d9..00000000000 --- a/src/engine/bufferscalers/rubberbandworker.cpp +++ /dev/null @@ -1,56 +0,0 @@ -#include "engine/bufferscalers/rubberbandworker.h" - -#include "engine/engine.h" -#include "moc_rubberbandworker.cpp" -#include "util/assert.h" -#include "util/compatibility/qmutex.h" - -using RubberBand::RubberBandStretcher; - -RubberBandWorker::RubberBandWorker() - : QThread(), - m_currentJob(Job{nullptr, nullptr, 0, false}), - m_completed(false) { -} - -void RubberBandWorker::schedule(RubberBand::RubberBandStretcher* stretcher, - const float* const* input, - size_t samples, - bool final) { - auto locker = lockMutex(&m_waitLock); - m_currentJob.instance = stretcher; - m_currentJob.input = input; - m_currentJob.samples = samples; - m_currentJob.final = final; - m_completed = false; - m_waitCondition.wakeOne(); -} - -void RubberBandWorker::waitReady() { - auto locker = lockMutex(&m_waitLock); - while (!m_completed) { - m_waitCondition.wait(&m_waitLock); - } -} - -void RubberBandWorker::stop() { - requestInterruption(); - m_waitCondition.wakeOne(); - wait(); -} -void RubberBandWorker::run() { - auto locker = lockMutex(&m_waitLock); - while (!isInterruptionRequested()) { - if (!m_completed && m_currentJob.instance && m_currentJob.input && m_currentJob.samples) { - m_currentJob.instance->process(m_currentJob.input, - m_currentJob.samples, - m_currentJob.final); - m_completed = true; - DEBUG_ASSERT(m_assigned.test(std::memory_order_relaxed)); - m_assigned.clear(std::memory_order_release); - m_waitCondition.wakeOne(); - } - m_waitCondition.wait(&m_waitLock); - } - quit(); -} diff --git a/src/engine/bufferscalers/rubberbandworker.h b/src/engine/bufferscalers/rubberbandworker.h deleted file mode 100644 index 392e9b70c60..00000000000 --- a/src/engine/bufferscalers/rubberbandworker.h +++ /dev/null @@ -1,49 +0,0 @@ -#pragma once - -#include - -#include -#include -#include -#include - -#include "audio/types.h" - -class RubberBandWorkerPool; -class RubberBandWorker : public QThread { - Q_OBJECT - public: - // Wait for the current job to complete. - void waitReady(); - - protected: - RubberBandWorker(); - - void run() override; - void schedule(RubberBand::RubberBandStretcher* stretcher, - const float* const* input, - size_t samples, - bool final); - void stop(); - - private: - struct Job { - RubberBand::RubberBandStretcher* instance; - const float* const* input; - size_t samples; - bool final; - }; - /// Contains the scheduled job. May be dangling if completed=true - Job m_currentJob; - // Used to schedule the thread - QMutex m_waitLock; - QWaitCondition m_waitCondition; - // Whether or not the scheduled job as completed - bool m_completed; - - // Used by RubberBandWorkerPool to manage thread availability. - // RubberBandWorker only clears the flag, once the job is completed - std::atomic_flag m_assigned; - - friend RubberBandWorkerPool; -}; diff --git a/src/engine/bufferscalers/rubberbandworkerpool.cpp b/src/engine/bufferscalers/rubberbandworkerpool.cpp index 8dea16a7600..077d4877317 100644 --- a/src/engine/bufferscalers/rubberbandworkerpool.cpp +++ b/src/engine/bufferscalers/rubberbandworkerpool.cpp @@ -2,11 +2,11 @@ #include -#include "engine/bufferscalers/rubberbandworker.h" #include "engine/engine.h" #include "util/assert.h" -RubberBandWorkerPool::RubberBandWorkerPool(UserSettingsPointer pConfig) { +RubberBandWorkerPool::RubberBandWorkerPool(UserSettingsPointer pConfig) + : QThreadPool() { bool multiThreadedOnStereo = pConfig && pConfig->getValue(ConfigKey(QStringLiteral("[App]"), QStringLiteral("keylock_multithreading")), @@ -16,36 +16,18 @@ RubberBandWorkerPool::RubberBandWorkerPool(UserSettingsPointer pConfig) { : mixxx::audio::ChannelCount::stereo(); DEBUG_ASSERT(0 == mixxx::kEngineChannelCount % m_channelPerWorker); + setThreadPriority(QThread::HighPriority); + setMaxThreadCount(mixxx::kEngineChannelCount / m_channelPerWorker); + // We allocate one runner less than the total of maximum supported channel, // so the engine thread will also perform a stretching operation, instead of // waiting all workers to complete. During performance testing, this ahas // show better results - for (int w = 1; w < mixxx::kEngineChannelCount / m_channelPerWorker; w++) { - m_workers.emplace_back( - // We cannot use make_unique here because RubberBandWorker ctor - // is protected to prevent direct usage. - new RubberBandWorker); - m_workers.back()->start(QThread::HighPriority); + for (int w = 0; w < maxThreadCount(); w++) { + reserveThread(); } } RubberBandWorkerPool::~RubberBandWorkerPool() { - for (auto& member : m_workers) { - member->stop(); - } - m_workers.clear(); -} - -RubberBandWorker* RubberBandWorkerPool::submit( - RubberBand::RubberBandStretcher* stretcher, - const float* const* input, - size_t samples, - bool final) { - for (auto& member : m_workers) { - if (!member->m_assigned.test_and_set(std::memory_order_acquire)) { - member->schedule(stretcher, input, samples, final); - return member.get(); - } - } - return nullptr; + waitForDone(); } diff --git a/src/engine/bufferscalers/rubberbandworkerpool.h b/src/engine/bufferscalers/rubberbandworkerpool.h index 087be5eca3b..d6887058c91 100644 --- a/src/engine/bufferscalers/rubberbandworkerpool.h +++ b/src/engine/bufferscalers/rubberbandworkerpool.h @@ -1,34 +1,18 @@ #pragma once +#include + #include "audio/types.h" #include "preferences/usersettings.h" #include "util/singleton.h" -class RubberBandWorker; -namespace RubberBand { -class RubberBandStretcher; -} - // RubberBandWorkerPool is a global pool manager for RubberBandWorkerPool. It // allows a the Engine thread to use a pool of agnostic RubberBandWorker which // can be distributed stretching job -class RubberBandWorkerPool : public Singleton { +class RubberBandWorkerPool : public QThreadPool, public Singleton { public: ~RubberBandWorkerPool(); - /// @brief Submit a new stretching job - /// @param stretcher the RubberBand::RubberBandStretcher instance to use. - /// Must remain valid till waitReady() as returned - /// @param input The samples buffer - /// @param samples the samples count - /// @param final whether or not this is the final buffer - /// @return the worker on which the job as been schedule one, or null if - /// none available - RubberBandWorker* submit(RubberBand::RubberBandStretcher* stretcher, - const float* const* input, - size_t samples, - bool final); - const mixxx::audio::ChannelCount& channelPerWorker() const { return m_channelPerWorker; } @@ -37,7 +21,7 @@ class RubberBandWorkerPool : public Singleton { RubberBandWorkerPool(UserSettingsPointer pConfig = nullptr); private: - std::vector> m_workers; + ; mixxx::audio::ChannelCount m_channelPerWorker; friend class Singleton; diff --git a/src/engine/bufferscalers/rubberbandwrapper.cpp b/src/engine/bufferscalers/rubberbandwrapper.cpp index c5a1a5cbe2b..f22a4e1deae 100644 --- a/src/engine/bufferscalers/rubberbandwrapper.cpp +++ b/src/engine/bufferscalers/rubberbandwrapper.cpp @@ -1,6 +1,5 @@ #include "engine/bufferscalers/rubberbandwrapper.h" -#include "engine/bufferscalers/rubberbandworker.h" #include "engine/bufferscalers/rubberbandworkerpool.h" #include "engine/engine.h" #include "util/assert.h" @@ -44,7 +43,7 @@ int RubberBandWrapper::available() const { for (auto& stretcher : m_pInstances) { available = qMin(available, stretcher->available()); } - return available; + return available == std::numeric_limits::max() ? 0 : available; } size_t RubberBandWrapper::retrieve(float* const* output, size_t samples) const { if (m_pInstances.size() == 1) { @@ -110,23 +109,20 @@ size_t RubberBandWrapper::getStartDelay() const { return m_pInstances[0]->getLatency(); #endif } -void RubberBandWrapper::process(const float* const* input, size_t samples, bool final) { +void RubberBandWrapper::process(const float* const* input, size_t samples, bool isFinal) { if (m_pInstances.size() == 1) { - return m_pInstances[0]->process(input, samples, final); + return m_pInstances[0]->process(input, samples, isFinal); } else { RubberBandWorkerPool* pPool = RubberBandWorkerPool::instance(); - QSet workers; - for (auto& instance : m_pInstances) { - auto pWorker = pPool->submit(instance.get(), input, samples, final); - if (!pWorker) { - instance->process(input, samples, final); - } else { - workers.insert(pWorker); + for (auto& pInstance : m_pInstances) { + pInstance->set(input, samples, isFinal); + if (!pPool->tryStart(pInstance.get())) { + pInstance->run(); } input += pPool->channelPerWorker(); } - for (auto& worker : workers) { - worker->waitReady(); + for (auto& pInstance : m_pInstances) { + pInstance->waitReady(); } } } @@ -150,14 +146,14 @@ void RubberBandWrapper::setup(mixxx::audio::SampleRate sampleRate, qDebug() << "RubberBandWrapper::setup" << channelPerWorker; VERIFY_OR_DEBUG_ASSERT(0 == chCount % channelPerWorker) { m_pInstances.emplace_back( - std::make_unique( + std::make_unique( sampleRate, chCount, opt)); return; } for (int c = 0; c < chCount; c += channelPerWorker) { m_pInstances.emplace_back( - std::make_unique( + std::make_unique( sampleRate, channelPerWorker, opt)); } } @@ -168,5 +164,5 @@ void RubberBandWrapper::setPitchScale(double scale) { } bool RubberBandWrapper::isValid() const { - return m_pInstances.size(); + return !m_pInstances.empty(); } diff --git a/src/engine/bufferscalers/rubberbandwrapper.h b/src/engine/bufferscalers/rubberbandwrapper.h index 080f04b8681..2a2224ba459 100644 --- a/src/engine/bufferscalers/rubberbandwrapper.h +++ b/src/engine/bufferscalers/rubberbandwrapper.h @@ -1,8 +1,7 @@ #pragma once -#include - #include "audio/types.h" +#include "engine/bufferscalers/rubberbandtask.h" /// RubberBandWrapper is a wrapper around RubberBand::RubberBandStretcher which /// allows to distribute signal stretching over multiple instance, but interface @@ -11,7 +10,7 @@ class RubberBandWrapper { public: int getEngineVersion() const; void setTimeRatio(double ratio); - size_t getSamplesRequired() const; + std::size_t getSamplesRequired() const; int available() const; size_t retrieve(float* const* output, size_t samples) const; size_t getInputIncrement() const; @@ -31,5 +30,6 @@ class RubberBandWrapper { bool isValid() const; private: - std::vector> m_pInstances; + // copy constructor of RubberBand::RubberBandStretcher is implicitly deleted. + std::vector> m_pInstances; }; diff --git a/src/test/signalpathtest.h b/src/test/signalpathtest.h index aef1e098d80..a1d4926bfdf 100644 --- a/src/test/signalpathtest.h +++ b/src/test/signalpathtest.h @@ -145,17 +145,17 @@ class BaseSignalPathTest : public MixxxTest, SoundSourceProviderRegistration { PlayerInfo::destroy(); } -#ifdef __RUBBERBAND__ void SetUp() override { +#ifdef __RUBBERBAND__ RubberBandWorkerPool::createInstance(); - } #endif + } -#ifdef __RUBBERBAND__ void TearDown() override { +#ifdef __RUBBERBAND__ RubberBandWorkerPool::destroy(); - } #endif + } void addDeck(EngineDeck* pDeck) { ControlObject::set(ConfigKey(pDeck->getGroup(), "main_mix"), 1.0);