Skip to content

Commit

Permalink
Few nits and using QThreadPool/QRunnable
Browse files Browse the repository at this point in the history
  • Loading branch information
acolombier committed May 31, 2024
1 parent 05df96d commit 406fc3a
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 176 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3246,7 +3246,7 @@ if(RUBBERBAND)
src/effects/backends/builtin/pitchshifteffect.cpp
src/engine/bufferscalers/enginebufferscalerubberband.cpp
src/engine/bufferscalers/rubberbandwrapper.cpp
src/engine/bufferscalers/rubberbandworker.cpp
src/engine/bufferscalers/rubberbandtask.cpp
src/engine/bufferscalers/rubberbandworkerpool.cpp
)
endif()
Expand Down
48 changes: 48 additions & 0 deletions src/engine/bufferscalers/rubberbandtask.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#include "engine/bufferscalers/rubberbandtask.h"

#include "engine/engine.h"
#include "util/assert.h"
#include "util/compatibility/qmutex.h"

RubberBandTask::RubberBandTask(
size_t sampleRate, size_t channels, Options options)
: RubberBand::RubberBandStretcher(sampleRate, channels, options),
QRunnable(),
m_completed(false),
m_input(nullptr),
m_samples(0),
m_isFinal(false) {
setAutoDelete(false);
}

void RubberBandTask::set(const float* const* input,
size_t samples,
bool isFinal) {
auto locker = lockMutex(&m_waitLock);
m_completed = false;
m_input = input;
m_samples = samples;
m_isFinal = isFinal;
}

void RubberBandTask::waitReady() {
auto locker = lockMutex(&m_waitLock);
VERIFY_OR_DEBUG_ASSERT(m_input && m_samples) {
return;
};
while (!m_completed) {
m_waitCondition.wait(&m_waitLock);
}
}

void RubberBandTask::run() {
auto locker = lockMutex(&m_waitLock);
VERIFY_OR_DEBUG_ASSERT(!m_completed && m_input && m_samples) {
return;
};
process(m_input,
m_samples,
m_isFinal);
m_completed = true;
m_waitCondition.wakeOne();
}
45 changes: 45 additions & 0 deletions src/engine/bufferscalers/rubberbandtask.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once

#include <rubberband/RubberBandStretcher.h>

#include <QMutex>
#include <QRunnable>
#include <QWaitCondition>
#include <atomic>

#include "audio/types.h"

using RubberBand::RubberBandStretcher;

class RubberBandTask : public RubberBandStretcher, public QRunnable {
public:
RubberBandTask(size_t sampleRate,
size_t channels,
Options options = DefaultOptions);

/// @brief Submit a new stretching task
/// @param stretcher the RubberBand::RubberBandStretcher instance to use.
/// Must remain valid till waitReady() as returned
/// @param input The samples buffer
/// @param samples the samples count
/// @param final whether or not this is the final buffer
void set(const float* const* input,
size_t samples,
bool isFinal);

// Wait for the current task to complete.
void waitReady();

void run();

private:
// Used to schedule the thread
QMutex m_waitLock;
QWaitCondition m_waitCondition;
// Whether or not the scheduled job as completed
bool m_completed;

const float* const* m_input;
size_t m_samples;
bool m_isFinal;
};
56 changes: 0 additions & 56 deletions src/engine/bufferscalers/rubberbandworker.cpp

This file was deleted.

49 changes: 0 additions & 49 deletions src/engine/bufferscalers/rubberbandworker.h

This file was deleted.

34 changes: 8 additions & 26 deletions src/engine/bufferscalers/rubberbandworkerpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

#include <rubberband/RubberBandStretcher.h>

#include "engine/bufferscalers/rubberbandworker.h"
#include "engine/engine.h"
#include "util/assert.h"

RubberBandWorkerPool::RubberBandWorkerPool(UserSettingsPointer pConfig) {
RubberBandWorkerPool::RubberBandWorkerPool(UserSettingsPointer pConfig)
: QThreadPool() {
bool multiThreadedOnStereo = pConfig &&
pConfig->getValue(ConfigKey(QStringLiteral("[App]"),
QStringLiteral("keylock_multithreading")),
Expand All @@ -16,36 +16,18 @@ RubberBandWorkerPool::RubberBandWorkerPool(UserSettingsPointer pConfig) {
: mixxx::audio::ChannelCount::stereo();
DEBUG_ASSERT(0 == mixxx::kEngineChannelCount % m_channelPerWorker);

setThreadPriority(QThread::HighPriority);
setMaxThreadCount(mixxx::kEngineChannelCount / m_channelPerWorker);

// We allocate one runner less than the total of maximum supported channel,
// so the engine thread will also perform a stretching operation, instead of
// waiting all workers to complete. During performance testing, this ahas
// show better results
for (int w = 1; w < mixxx::kEngineChannelCount / m_channelPerWorker; w++) {
m_workers.emplace_back(
// We cannot use make_unique here because RubberBandWorker ctor
// is protected to prevent direct usage.
new RubberBandWorker);
m_workers.back()->start(QThread::HighPriority);
for (int w = 0; w < maxThreadCount(); w++) {
reserveThread();
}
}

RubberBandWorkerPool::~RubberBandWorkerPool() {
for (auto& member : m_workers) {
member->stop();
}
m_workers.clear();
}

RubberBandWorker* RubberBandWorkerPool::submit(
RubberBand::RubberBandStretcher* stretcher,
const float* const* input,
size_t samples,
bool final) {
for (auto& member : m_workers) {
if (!member->m_assigned.test_and_set(std::memory_order_acquire)) {
member->schedule(stretcher, input, samples, final);
return member.get();
}
}
return nullptr;
waitForDone();
}
24 changes: 4 additions & 20 deletions src/engine/bufferscalers/rubberbandworkerpool.h
Original file line number Diff line number Diff line change
@@ -1,34 +1,18 @@
#pragma once

#include <QThreadPool>

#include "audio/types.h"
#include "preferences/usersettings.h"
#include "util/singleton.h"

class RubberBandWorker;
namespace RubberBand {
class RubberBandStretcher;
}

// RubberBandWorkerPool is a global pool manager for RubberBandWorkerPool. It
// allows a the Engine thread to use a pool of agnostic RubberBandWorker which
// can be distributed stretching job
class RubberBandWorkerPool : public Singleton<RubberBandWorkerPool> {
class RubberBandWorkerPool : public QThreadPool, public Singleton<RubberBandWorkerPool> {
public:
~RubberBandWorkerPool();

/// @brief Submit a new stretching job
/// @param stretcher the RubberBand::RubberBandStretcher instance to use.
/// Must remain valid till waitReady() as returned
/// @param input The samples buffer
/// @param samples the samples count
/// @param final whether or not this is the final buffer
/// @return the worker on which the job as been schedule one, or null if
/// none available
RubberBandWorker* submit(RubberBand::RubberBandStretcher* stretcher,
const float* const* input,
size_t samples,
bool final);

const mixxx::audio::ChannelCount& channelPerWorker() const {
return m_channelPerWorker;
}
Expand All @@ -37,7 +21,7 @@ class RubberBandWorkerPool : public Singleton<RubberBandWorkerPool> {
RubberBandWorkerPool(UserSettingsPointer pConfig = nullptr);

private:
std::vector<std::unique_ptr<RubberBandWorker>> m_workers;
;
mixxx::audio::ChannelCount m_channelPerWorker;

friend class Singleton<RubberBandWorkerPool>;
Expand Down
28 changes: 12 additions & 16 deletions src/engine/bufferscalers/rubberbandwrapper.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "engine/bufferscalers/rubberbandwrapper.h"

#include "engine/bufferscalers/rubberbandworker.h"
#include "engine/bufferscalers/rubberbandworkerpool.h"
#include "engine/engine.h"
#include "util/assert.h"
Expand Down Expand Up @@ -44,7 +43,7 @@ int RubberBandWrapper::available() const {
for (auto& stretcher : m_pInstances) {

Check warning on line 43 in src/engine/bufferscalers/rubberbandwrapper.cpp

View workflow job for this annotation

GitHub Actions / clang-tidy

'auto &stretcher' can be declared as 'const auto &stretcher' [readability-qualified-auto]
available = qMin(available, stretcher->available());
}
return available;
return available == std::numeric_limits<int>::max() ? 0 : available;
}
size_t RubberBandWrapper::retrieve(float* const* output, size_t samples) const {
if (m_pInstances.size() == 1) {
Expand Down Expand Up @@ -110,23 +109,20 @@ size_t RubberBandWrapper::getStartDelay() const {
return m_pInstances[0]->getLatency();
#endif
}
void RubberBandWrapper::process(const float* const* input, size_t samples, bool final) {
void RubberBandWrapper::process(const float* const* input, size_t samples, bool isFinal) {
if (m_pInstances.size() == 1) {
return m_pInstances[0]->process(input, samples, final);
return m_pInstances[0]->process(input, samples, isFinal);
} else {
RubberBandWorkerPool* pPool = RubberBandWorkerPool::instance();
QSet<RubberBandWorker*> workers;
for (auto& instance : m_pInstances) {
auto pWorker = pPool->submit(instance.get(), input, samples, final);
if (!pWorker) {
instance->process(input, samples, final);
} else {
workers.insert(pWorker);
for (auto& pInstance : m_pInstances) {
pInstance->set(input, samples, isFinal);
if (!pPool->tryStart(pInstance.get())) {
pInstance->run();
}
input += pPool->channelPerWorker();
}
for (auto& worker : workers) {
worker->waitReady();
for (auto& pInstance : m_pInstances) {
pInstance->waitReady();
}
}
}
Expand All @@ -150,14 +146,14 @@ void RubberBandWrapper::setup(mixxx::audio::SampleRate sampleRate,
qDebug() << "RubberBandWrapper::setup" << channelPerWorker;
VERIFY_OR_DEBUG_ASSERT(0 == chCount % channelPerWorker) {
m_pInstances.emplace_back(
std::make_unique<RubberBand::RubberBandStretcher>(
std::make_unique<RubberBandTask>(
sampleRate, chCount, opt));
return;
}

for (int c = 0; c < chCount; c += channelPerWorker) {
m_pInstances.emplace_back(
std::make_unique<RubberBand::RubberBandStretcher>(
std::make_unique<RubberBandTask>(
sampleRate, channelPerWorker, opt));
}
}
Expand All @@ -168,5 +164,5 @@ void RubberBandWrapper::setPitchScale(double scale) {
}

bool RubberBandWrapper::isValid() const {
return m_pInstances.size();
return !m_pInstances.empty();
}
Loading

0 comments on commit 406fc3a

Please sign in to comment.