Skip to content

Commit

Permalink
- Cleaned up the code duplication in the futex implementation - main …
Browse files Browse the repository at this point in the history
…and worklet futex now share the code nicely

- Made the 'worklet' environment a proper environment. If disabled none of the worklet overhead is compiled in. When enabled it adds all the needed stuff on top of what's included by 'worker', which is required as soon as USE_PTHREADS=1.
  • Loading branch information
tklajnscek committed Oct 29, 2020
1 parent 15c5101 commit 1b1512a
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 102 deletions.
5 changes: 4 additions & 1 deletion emcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
}


VALID_ENVIRONMENTS = ('web', 'webview', 'worker', 'node', 'shell')
VALID_ENVIRONMENTS = ('web', 'webview', 'worker', 'node', 'shell', 'worklet')


# this function uses the global 'final' variable, which contains the current
Expand Down Expand Up @@ -290,6 +290,9 @@ def setup_environment_settings():
'worker' in environments or \
(shared.Settings.ENVIRONMENT_MAY_BE_NODE and shared.Settings.USE_PTHREADS)

# Worklet environment must be enabled explicitly for now
shared.Settings.ENVIRONMENT_MAY_BE_WORKLET = 'worklet' in environments

if not shared.Settings.ENVIRONMENT_MAY_BE_WORKER and shared.Settings.PROXY_TO_WORKER:
exit_with_error('If you specify --proxy-to-worker and specify a "-s ENVIRONMENT=" directive, it must include "worker" as a target! (Try e.g. -s ENVIRONMENT=web,worker)')

Expand Down
154 changes: 58 additions & 96 deletions src/library_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,17 @@ var LibraryPThread = {
#if ASSERTIONS
assert(PThread.mainThreadFutex > 0);
#endif
#if ENVIRONMENT_MAY_BE_WORKLET
PThread.workletFutex = _worklet_futex;
#if ASSERTIONS
assert(PThread.workletFutex > 0);
#endif
PThread.spinFutexes = [PThread.mainThreadFutex, PThread.workletFutex];
#else
PThread.spinFutexes = [PThread.mainThreadFutex];
#endif


},
// Maps pthread_t to pthread info objects
pthreads: {},
Expand Down Expand Up @@ -479,6 +486,7 @@ var LibraryPThread = {
}
},

#if ENVIRONMENT_MAY_BE_WORKLET
// Initializes a pthread in the AudioWorkletGlobalScope for this audio context
initAudioWorkletPThread: function(audioCtx, pthreadPtr) {
var aw = audioCtx.audioWorklet;
Expand Down Expand Up @@ -547,6 +555,7 @@ var LibraryPThread = {
})
});
}
#endif
},

$killThread: function(pthread_ptr) {
Expand Down Expand Up @@ -1262,58 +1271,7 @@ var LibraryPThread = {
if (ret === 'not-equal') return -{{{ cDefine('EWOULDBLOCK') }}};
if (ret === 'ok') return 0;
throw 'Atomics.wait returned an unexpected value ' + ret;
} else if (ENVIRONMENT_IS_WORKLET) {
// Worklets use a simple busy loop becuase Atomics.wait is not available in worklets, so simulate it via busy spinning.
// First, check if the value is correct for us to wait on.
if (Atomics.load(HEAP32, addr >> 2) != val) {
return -{{{ cDefine('EWOULDBLOCK') }}};
}

var tNow = performance.now();
var tEnd = tNow + timeout;

#if PTHREADS_PROFILING
PThread.setThreadStatusConditional(_pthread_self(), {{{ cDefine('EM_THREAD_STATUS_RUNNING') }}}, {{{ cDefine('EM_THREAD_STATUS_WAITFUTEX') }}});
#endif
// All worklets use the same global address since they all run on the
// render thread. When zero, the worklet is not waiting on anything, and on
// nonzero, the contents of the address pointed by PThread.workletFutex
// tell which address the worklet is simulating its wait on.
#if ASSERTIONS
assert(PThread.workletFutex > 0);
#endif
var lastAddr = Atomics.exchange(HEAP32, PThread.workletFutex >> 2, addr);
#if ASSERTIONS
// We must not have already been waiting.
assert(lastAddr == 0);
#endif

while (1) {
// Check for a timeout.
tNow = performance.now();
if (tNow > tEnd) {
#if PTHREADS_PROFILING
PThread.setThreadStatusConditional(_pthread_self(), {{{ cDefine('EM_THREAD_STATUS_RUNNING') }}}, {{{ cDefine('EM_THREAD_STATUS_WAITFUTEX') }}});
#endif
// We timed out, so stop marking ourselves as waiting.
lastAddr = Atomics.exchange(HEAP32, PThread.workletFutex >> 2, 0);
#if ASSERTIONS
// The current value must have been our address which we set, or
// in a race it was set to 0 which means another thread just allowed
// us to run, but (tragically) that happened just a bit too late.
assert(lastAddr == addr || lastAddr == 0);
#endif
return -{{{ cDefine('ETIMEDOUT') }}};
}

lastAddr = Atomics.load(HEAP32, PThread.workletFutex >> 2);
if (lastAddr != addr) {
// We were told to stop waiting, so stop.
break;
}
}
}
else {
} else {
// First, check if the value is correct for us to wait on.
if (Atomics.load(HEAP32, addr >> 2) != val) {
return -{{{ cDefine('EWOULDBLOCK') }}};
Expand All @@ -1338,10 +1296,20 @@ var LibraryPThread = {
// ourselves before calling the potentially-recursive call. See below for
// how we handle the case of our futex being notified during the time in
// between when we are not set as the value of mainThreadFutex.
//
// For audio worklets we use the same global address since they all run on
// the audio thread. It's all very similar to the main thread case, except we
// don't have to do any nested call special casing.
var usedFutex = PThread.mainThreadFutex;
#if ENVIRONMENT_MAY_BE_WORKLET
if (ENVIRONMENT_IS_WORKLET) {
usedFutex = PThread.workletFutex;
}
#endif
#if ASSERTIONS
assert(PThread.mainThreadFutex > 0);
assert(usedFutex > 0);
#endif
var lastAddr = Atomics.exchange(HEAP32, PThread.mainThreadFutex >> 2, addr);
var lastAddr = Atomics.exchange(HEAP32, usedFutex >> 2, addr);
#if ASSERTIONS
// We must not have already been waiting.
assert(lastAddr == 0);
Expand All @@ -1355,7 +1323,7 @@ var LibraryPThread = {
PThread.setThreadStatusConditional(_pthread_self(), {{{ cDefine('EM_THREAD_STATUS_RUNNING') }}}, {{{ cDefine('EM_THREAD_STATUS_WAITFUTEX') }}});
#endif
// We timed out, so stop marking ourselves as waiting.
lastAddr = Atomics.exchange(HEAP32, PThread.mainThreadFutex >> 2, 0);
lastAddr = Atomics.exchange(HEAP32, usedFutex >> 2, 0);
#if ASSERTIONS
// The current value must have been our address which we set, or
// in a race it was set to 0 which means another thread just allowed
Expand All @@ -1364,6 +1332,19 @@ var LibraryPThread = {
#endif
return -{{{ cDefine('ETIMEDOUT') }}};
}

#if ENVIRONMENT_MAY_BE_WORKLET
if (ENVIRONMENT_IS_WORKLET) {
// Audio worklet version without any special casing like for the main thread below
lastAddr = Atomics.load(HEAP32, PThread.workletFutex >> 2);
if (lastAddr != addr) {
// We were told to stop waiting, so stop.
break;
}
continue;
}
#endif

// We are performing a blocking loop here, so we must handle proxied
// events from pthreads, to avoid deadlocks.
// Note that we have to do so carefully, as we may take a lock while
Expand Down Expand Up @@ -1439,56 +1420,37 @@ var LibraryPThread = {
// For Atomics.notify() API Infinity is to be passed in that case.
if (count >= {{{ cDefine('INT_MAX') }}}) count = Infinity;

// See if main thread is waiting on this address? If so, wake it up by resetting its wake location to zero.
// Note that this is not a fair procedure, since we always wake main thread first before any workers, so
// See if any spin futex is waiting on this address? If so, wake it up by resetting its wake location to zero.
// Note that this is not a fair procedure, since we always wake these up first before any workers, so
// this scheme does not adhere to real queue-based waiting.
// Spin futexes are used on the main thread and in worklets due to lack of Atomic.wait().
var spinFutexesWoken = 0;
for(var i = 0; i < PThread.spinFutexes.length; ++i) {
var futex = PThread.spinFutexes[i];
#if ASSERTIONS
assert(PThread.mainThreadFutex > 0);
assert(futex > 0);
#endif
var mainThreadWaitAddress = Atomics.load(HEAP32, PThread.mainThreadFutex >> 2);
var mainThreadWoken = 0;
if (mainThreadWaitAddress == addr) {
var waitAddr = Atomics.load(HEAP32, futex >> 2);
if (waitAddr == addr) {
#if ASSERTIONS
// We only use mainThreadFutex on the main browser thread, where we
// cannot block while we wait. Therefore we should only see it set from
// other threads, and not on the main thread itself. In other words, the
// main thread must never try to wake itself up!
assert(!ENVIRONMENT_IS_WEB && !ENVIRONMENT_IS_WORKLET);
#endif
var loadedAddr = Atomics.compareExchange(HEAP32, PThread.mainThreadFutex >> 2, mainThreadWaitAddress, 0);
if (loadedAddr == mainThreadWaitAddress) {
--count;
mainThreadWoken = 1;
if (count <= 0) return 1;
}
}

// See if a worklet is waiting on this address? If so, wake it up by resetting its wake location to zero.
// Note that this is not a fair procedure, since we always wake worklets before any workers, so
// this scheme does not adhere to real queue-based waiting.
#if ASSERTIONS
assert(PThread.workletFutex > 0);
#endif
var workletWaitAddress = Atomics.load(HEAP32, PThread.workletFutex >> 2);
var workletWoken = 0;
if (workletWaitAddress == addr) {
#if ASSERTIONS
// We only use workletFutex in worklets, where we cannot block while we wait.
// Therefore we should only see it set from other threads, and not in worklets
// themselves. In other words, a worklet must never try to wake itself up!
assert(!ENVIRONMENT_IS_WORKLET);
#endif
var loadedAddr = Atomics.compareExchange(HEAP32, PThread.workletFutex >> 2, workletWaitAddress, 0);
if (loadedAddr == workletWaitAddress) {
--count;
workletWoken = 1;
if (count <= 0) return 1;
// We only use mainThreadFutex on the main browser thread, where we
// cannot block while we wait. Therefore we should only see it set from
// other threads, and not on the main thread itself. In other words, the
// main thread must never try to wake itself up!
assert(futex != PThread.mainThreadFutex || !ENVIRONMENT_IS_WEB);
#endif
var loadedAddr = Atomics.compareExchange(HEAP32, futex >> 2, waitAddr, 0);
if (loadedAddr == waitAddr) {
--count;
spinFutexesWoken = 1;
if (count <= 0) return 1;
}
}
}

// Wake any workers waiting on this address.
var ret = Atomics.notify(HEAP32, addr >> 2, count);
if (ret >= 0) return ret + mainThreadWoken + workletWoken;
if (ret >= 0) return ret + spinFutexesWoken;
throw 'Atomics.notify returned an unexpected value ' + ret;
},

Expand Down
1 change: 1 addition & 0 deletions src/settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ var LEGACY_VM_SUPPORT = 0;
// 'webview' - just like web, but in a webview like Cordova;
// considered to be same as "web" in almost every place
// 'worker' - a web worker environment.
// 'worklet' - an (audio) worklet environment.
// 'node' - Node.js.
// 'shell' - a JS shell like d8, js, or jsc.
// Or it can be a comma-separated list of them, e.g., "web,worker". If this is
Expand Down
1 change: 1 addition & 0 deletions src/settings_internal.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ var ENVIRONMENT_MAY_BE_WORKER = 1;
var ENVIRONMENT_MAY_BE_NODE = 1;
var ENVIRONMENT_MAY_BE_SHELL = 1;
var ENVIRONMENT_MAY_BE_WEBVIEW = 1;
var ENVIRONMENT_MAY_BE_WORKLET = 1;

// Whether to minify import and export names in the minify_wasm_js stage.
var MINIFY_WASM_IMPORTS_AND_EXPORTS = 0;
Expand Down
4 changes: 2 additions & 2 deletions src/shell.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ var ENVIRONMENT_IS_SHELL = false;
var ENVIRONMENT_IS_WORKLET = false;
ENVIRONMENT_IS_WEB = typeof window === 'object';
ENVIRONMENT_IS_WORKER = typeof importScripts === 'function';
ENVIRONMENT_IS_WORKLET = typeof AudioWorkletGlobalScope === 'function';
// N.b. Electron.js environment is simultaneously a NODE-environment, but
// also a web environment.
ENVIRONMENT_IS_NODE = typeof process === 'object' && typeof process.versions === 'object' && typeof process.versions.node === 'string';
ENVIRONMENT_IS_SHELL = !ENVIRONMENT_IS_WEB && !ENVIRONMENT_IS_NODE && !ENVIRONMENT_IS_WORKER;
ENVIRONMENT_IS_WORKLET = typeof AudioWorkletGlobalScope === 'function';
ENVIRONMENT_IS_SHELL = !ENVIRONMENT_IS_WEB && !ENVIRONMENT_IS_NODE && !ENVIRONMENT_IS_WORKER && !ENVIRONMENT_IS_WORKLET;
#endif // ENVIRONMENT

#if ASSERTIONS
Expand Down
4 changes: 3 additions & 1 deletion src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ if (typeof process === 'object' && typeof process.versions === 'object' && typeo
}
#endif // ENVIRONMENT_MAY_BE_NODE

#if ENVIRONMENT_MAY_BE_WORKLET
if (typeof AudioWorkletGlobalScope === "function") {
// Polyfill performance.now() since it's missing in worklets, falling back to Date.now()
if (!globalObj['performance']) {
Expand All @@ -360,4 +361,5 @@ if (typeof AudioWorkletGlobalScope === "function") {
}

registerProcessor('pthread-dummy-processor', PThreadDummyProcessor);
}
}
#endif
4 changes: 2 additions & 2 deletions tests/test_browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4885,15 +4885,15 @@ def test_manual_pthread_proxy_hammer(self, args):
# don't run this with the default extra_tries value, as this is
# *meant* to notice something random, a race condition.
extra_tries=0)

# Tests audio worklets
@requires_threads
@requires_sound_hardware
def test_audio_worklet(self):
self.btest(path_from_root('tests', 'audioworklet', 'audioworklet.cpp'),
expected='1',
args=['-s', 'USE_PTHREADS=1', '-s', 'MODULARIZE=1', '-s',
'EXPORT_NAME=AudioWorkletSample', '-s', 'ENVIRONMENT=web,worker',
'EXPORT_NAME=AudioWorkletSample', '-s', 'ENVIRONMENT=web,worker,worklet',
'--extern-post-js', path_from_root('tests', 'audioworklet', 'audioworklet_post.js'),
'--shell-file', path_from_root('tests', 'audioworklet', 'shell.html')])

Expand Down

0 comments on commit 1b1512a

Please sign in to comment.