Skip to content

Commit

Permalink
src: align worker and main thread code with embedder API
Browse files Browse the repository at this point in the history
This addresses some long-standing TODOs by Joyee and me about
making the embedder API more powerful and us less reliant on
internal APIs for creating the main thread and Workers.

PR-URL: #30467
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Gireesh Punathil <[email protected]>
  • Loading branch information
addaleax committed Mar 21, 2020
1 parent 084c379 commit a9fb51f
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 117 deletions.
88 changes: 82 additions & 6 deletions src/api/environment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
#include "node_v8_platform-inl.h"
#include "uv.h"

#if HAVE_INSPECTOR
#include "inspector/worker_inspector.h" // ParentInspectorHandle
#endif

namespace node {
using errors::TryCatchScope;
using v8::Array;
Expand Down Expand Up @@ -332,26 +336,40 @@ Environment* CreateEnvironment(IsolateData* isolate_data,
const char* const* argv,
int exec_argc,
const char* const* exec_argv) {
return CreateEnvironment(
isolate_data, context,
std::vector<std::string>(argv, argv + argc),
std::vector<std::string>(exec_argv, exec_argv + exec_argc));
}

Environment* CreateEnvironment(
IsolateData* isolate_data,
Local<Context> context,
const std::vector<std::string>& args,
const std::vector<std::string>& exec_args,
EnvironmentFlags::Flags flags,
ThreadId thread_id) {
Isolate* isolate = context->GetIsolate();
HandleScope handle_scope(isolate);
Context::Scope context_scope(context);
// TODO(addaleax): This is a much better place for parsing per-Environment
// options than the global parse call.
std::vector<std::string> args(argv, argv + argc);
std::vector<std::string> exec_args(exec_argv, exec_argv + exec_argc);
// TODO(addaleax): Provide more sensible flags, in an embedder-accessible way.
Environment* env = new Environment(
isolate_data,
context,
args,
exec_args,
static_cast<Environment::Flags>(Environment::kOwnsProcessState |
Environment::kOwnsInspector));
env->InitializeLibuv(per_process::v8_is_profiling);
flags,
thread_id);
if (flags & EnvironmentFlags::kOwnsProcessState) {
env->set_abort_on_uncaught_exception(false);
}

if (env->RunBootstrapping().IsEmpty()) {
FreeEnvironment(env);
return nullptr;
}

return env;
}

Expand All @@ -376,6 +394,58 @@ void FreeEnvironment(Environment* env) {
delete env;
}

InspectorParentHandle::~InspectorParentHandle() {}

// Hide the internal handle class from the public API.
#if HAVE_INSPECTOR
struct InspectorParentHandleImpl : public InspectorParentHandle {
std::unique_ptr<inspector::ParentInspectorHandle> impl;

explicit InspectorParentHandleImpl(
std::unique_ptr<inspector::ParentInspectorHandle>&& impl)
: impl(std::move(impl)) {}
};
#endif

NODE_EXTERN std::unique_ptr<InspectorParentHandle> GetInspectorParentHandle(
Environment* env,
ThreadId thread_id,
const char* url) {
CHECK_NOT_NULL(env);
CHECK_NE(thread_id.id, static_cast<uint64_t>(-1));
#if HAVE_INSPECTOR
return std::make_unique<InspectorParentHandleImpl>(
env->inspector_agent()->GetParentHandle(thread_id.id, url));
#else
return {};
#endif
}

void LoadEnvironment(Environment* env) {
USE(LoadEnvironment(env, {}));
}

MaybeLocal<Value> LoadEnvironment(
Environment* env,
std::unique_ptr<InspectorParentHandle> inspector_parent_handle) {
env->InitializeLibuv(per_process::v8_is_profiling);
env->InitializeDiagnostics();

#if HAVE_INSPECTOR
if (inspector_parent_handle) {
env->InitializeInspector(
std::move(static_cast<InspectorParentHandleImpl*>(
inspector_parent_handle.get())->impl));
} else {
env->InitializeInspector({});
}
#endif

// TODO(joyeecheung): Allow embedders to customize the entry
// point more directly without using _third_party_main.js
return StartExecution(env);
}

Environment* GetCurrentEnvironment(Local<Context> context) {
return Environment::GetCurrent(context);
}
Expand Down Expand Up @@ -592,4 +662,10 @@ void AddLinkedBinding(Environment* env,
AddLinkedBinding(env, mod);
}

static std::atomic<uint64_t> next_thread_id{0};

ThreadId AllocateEnvironmentThreadId() {
return ThreadId { next_thread_id++ };
}

} // namespace node
11 changes: 7 additions & 4 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -820,8 +820,9 @@ void Environment::SetImmediateThreadsafe(Fn&& cb) {
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_threadsafe_.Push(std::move(callback));
if (task_queues_async_initialized_)
uv_async_send(&task_queues_async_);
}
uv_async_send(&task_queues_async_);
}

template <typename Fn>
Expand All @@ -831,8 +832,9 @@ void Environment::RequestInterrupt(Fn&& cb) {
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_interrupts_.Push(std::move(callback));
if (task_queues_async_initialized_)
uv_async_send(&task_queues_async_);
}
uv_async_send(&task_queues_async_);
RequestInterruptFromV8();
}

Expand Down Expand Up @@ -893,11 +895,11 @@ inline bool Environment::is_main_thread() const {
}

inline bool Environment::owns_process_state() const {
return flags_ & kOwnsProcessState;
return flags_ & EnvironmentFlags::kOwnsProcessState;
}

inline bool Environment::owns_inspector() const {
return flags_ & kOwnsInspector;
return flags_ & EnvironmentFlags::kOwnsInspector;
}

bool Environment::filehandle_close_warning() const {
Expand Down Expand Up @@ -1226,6 +1228,7 @@ void Environment::RemoveCleanupHook(void (*fn)(void*), void* arg) {
inline void Environment::RegisterFinalizationGroupForCleanup(
v8::Local<v8::FinalizationGroup> group) {
cleanup_finalization_groups_.emplace_back(isolate(), group);
DCHECK(task_queues_async_initialized_);
uv_async_send(&task_queues_async_);
}

Expand Down
47 changes: 34 additions & 13 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,6 @@ void TrackingTraceStateObserver::UpdateTraceCategoryState() {
.ToLocalChecked();
}

static std::atomic<uint64_t> next_thread_id{0};

uint64_t Environment::AllocateThreadId() {
return next_thread_id++;
}

void Environment::CreateProperties() {
HandleScope handle_scope(isolate_);
Local<Context> ctx = context();
Expand Down Expand Up @@ -294,8 +288,8 @@ Environment::Environment(IsolateData* isolate_data,
Local<Context> context,
const std::vector<std::string>& args,
const std::vector<std::string>& exec_args,
Flags flags,
uint64_t thread_id)
EnvironmentFlags::Flags flags,
ThreadId thread_id)
: isolate_(context->GetIsolate()),
isolate_data_(isolate_data),
immediate_info_(context->GetIsolate()),
Expand All @@ -307,14 +301,23 @@ Environment::Environment(IsolateData* isolate_data,
should_abort_on_uncaught_toggle_(isolate_, 1),
stream_base_state_(isolate_, StreamBase::kNumStreamBaseStateFields),
flags_(flags),
thread_id_(thread_id == kNoThreadId ? AllocateThreadId() : thread_id),
thread_id_(thread_id.id == static_cast<uint64_t>(-1) ?
AllocateEnvironmentThreadId().id : thread_id.id),
fs_stats_field_array_(isolate_, kFsStatsBufferLength),
fs_stats_field_bigint_array_(isolate_, kFsStatsBufferLength),
context_(context->GetIsolate(), context) {
// We'll be creating new objects so make sure we've entered the context.
HandleScope handle_scope(isolate());
Context::Scope context_scope(context);

// Set some flags if only kDefaultFlags was passed. This can make API version
// transitions easier for embedders.
if (flags_ & EnvironmentFlags::kDefaultFlags) {
flags_ = flags_ |
EnvironmentFlags::kOwnsProcessState |
EnvironmentFlags::kOwnsInspector;
}

set_env_vars(per_process::system_environment);
enabled_debug_list_.Parse(this);

Expand All @@ -333,6 +336,10 @@ Environment::Environment(IsolateData* isolate_data,

AssignToContext(context, ContextInfo(""));

static uv_once_t init_once = UV_ONCE_INIT;
uv_once(&init_once, InitThreadLocalOnce);
uv_key_set(&thread_local_env, this);

if (tracing::AgentWriterHandle* writer = GetTracingAgentWriter()) {
trace_state_observer_ = std::make_unique<TrackingTraceStateObserver>(this);
if (TracingController* tracing_controller = writer->GetTracingController())
Expand Down Expand Up @@ -389,6 +396,9 @@ Environment::Environment(IsolateData* isolate_data,
Environment::~Environment() {
if (interrupt_data_ != nullptr) *interrupt_data_ = nullptr;

// FreeEnvironment() should have set this.
CHECK(is_stopping());

isolate()->GetHeapProfiler()->RemoveBuildEmbedderGraphCallback(
BuildEmbedderGraph, this);

Expand Down Expand Up @@ -472,6 +482,15 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));

{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
task_queues_async_initialized_ = true;
if (native_immediates_threadsafe_.size() > 0 ||
native_immediates_interrupts_.size() > 0) {
uv_async_send(&task_queues_async_);
}
}

// Register clean-up cb to be called to clean up the handles
// when the environment is freed, note that they are not cleaned in
// the one environment per process setup, but will be called in
Expand All @@ -481,10 +500,6 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
if (start_profiler_idle_notifier) {
StartProfilerIdleNotifier();
}

static uv_once_t init_once = UV_ONCE_INIT;
uv_once(&init_once, InitThreadLocalOnce);
uv_key_set(&thread_local_env, this);
}

void Environment::ExitEnv() {
Expand Down Expand Up @@ -533,6 +548,11 @@ void Environment::RegisterHandleCleanups() {
}

void Environment::CleanupHandles() {
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
task_queues_async_initialized_ = false;
}

Isolate::DisallowJavascriptExecutionScope disallow_js(isolate(),
Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);

Expand Down Expand Up @@ -1101,6 +1121,7 @@ void Environment::CleanupFinalizationGroups() {
if (try_catch.HasCaught() && !try_catch.HasTerminated())
errors::TriggerUncaughtException(isolate(), try_catch);
// Re-schedule the execution of the remainder of the queue.
CHECK(task_queues_async_initialized_);
uv_async_send(&task_queues_async_);
return;
}
Expand Down
20 changes: 8 additions & 12 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -857,12 +857,6 @@ class Environment : public MemoryRetainer {
inline void PushAsyncCallbackScope();
inline void PopAsyncCallbackScope();

enum Flags {
kNoFlags = 0,
kOwnsProcessState = 1 << 1,
kOwnsInspector = 1 << 2,
};

static inline Environment* GetCurrent(v8::Isolate* isolate);
static inline Environment* GetCurrent(v8::Local<v8::Context> context);
static inline Environment* GetCurrent(
Expand All @@ -881,8 +875,8 @@ class Environment : public MemoryRetainer {
v8::Local<v8::Context> context,
const std::vector<std::string>& args,
const std::vector<std::string>& exec_args,
Flags flags = Flags(),
uint64_t thread_id = kNoThreadId);
EnvironmentFlags::Flags flags,
ThreadId thread_id);
~Environment() override;

void InitializeLibuv(bool start_profiler_idle_notifier);
Expand Down Expand Up @@ -1051,9 +1045,6 @@ class Environment : public MemoryRetainer {
inline bool has_serialized_options() const;
inline void set_has_serialized_options(bool has_serialized_options);

static uint64_t AllocateThreadId();
static constexpr uint64_t kNoThreadId = -1;

inline bool is_main_thread() const;
inline bool owns_process_state() const;
inline bool owns_inspector() const;
Expand Down Expand Up @@ -1338,7 +1329,7 @@ class Environment : public MemoryRetainer {
bool has_serialized_options_ = false;

std::atomic_bool can_call_into_js_ { true };
Flags flags_;
uint64_t flags_;
uint64_t thread_id_;
std::unordered_set<worker::Worker*> sub_worker_contexts_;

Expand Down Expand Up @@ -1440,6 +1431,11 @@ class Environment : public MemoryRetainer {
Mutex native_immediates_threadsafe_mutex_;
NativeImmediateQueue native_immediates_threadsafe_;
NativeImmediateQueue native_immediates_interrupts_;
// Also guarded by native_immediates_threadsafe_mutex_. This can be used when
// trying to post tasks from other threads to an Environment, as the libuv
// handle for the immediate queues (task_queues_async_) may not be initialized
// yet or already have been destroyed.
bool task_queues_async_initialized_ = false;

void RunAndClearNativeImmediates(bool only_refed = false);
void RunAndClearInterrupts();
Expand Down
19 changes: 7 additions & 12 deletions src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ MaybeLocal<Value> ExecuteBootstrapper(Environment* env,
int Environment::InitializeInspector(
std::unique_ptr<inspector::ParentInspectorHandle> parent_handle) {
std::string inspector_path;
bool is_main = !parent_handle;
if (parent_handle) {
DCHECK(!is_main_thread());
inspector_path = parent_handle->url();
inspector_agent_->SetParentHandle(std::move(parent_handle));
} else {
Expand All @@ -213,7 +213,7 @@ int Environment::InitializeInspector(
inspector_agent_->Start(inspector_path,
options_->debug_options(),
inspector_host_port(),
is_main_thread());
is_main);
if (options_->debug_options().inspector_enabled &&
!inspector_agent_->IsListening()) {
return 12; // Signal internal error
Expand Down Expand Up @@ -402,14 +402,18 @@ MaybeLocal<Value> StartExecution(Environment* env, const char* main_script_id) {
ExecuteBootstrapper(env, main_script_id, &parameters, &arguments));
}

MaybeLocal<Value> StartMainThreadExecution(Environment* env) {
MaybeLocal<Value> StartExecution(Environment* env) {
// To allow people to extend Node in different ways, this hook allows
// one to drop a file lib/_third_party_main.js into the build
// directory which will be executed instead of Node's normal loading.
if (NativeModuleEnv::Exists("_third_party_main")) {
return StartExecution(env, "internal/main/run_third_party_main");
}

if (env->worker_context() != nullptr) {
return StartExecution(env, "internal/main/worker_thread");
}

std::string first_argv;
if (env->argv().size() > 1) {
first_argv = env->argv()[1];
Expand Down Expand Up @@ -448,15 +452,6 @@ MaybeLocal<Value> StartMainThreadExecution(Environment* env) {
return StartExecution(env, "internal/main/eval_stdin");
}

void LoadEnvironment(Environment* env) {
CHECK(env->is_main_thread());
// TODO(joyeecheung): Not all of the execution modes in
// StartMainThreadExecution() make sense for embedders. Pick the
// useful ones out, and allow embedders to customize the entry
// point more directly without using _third_party_main.js
USE(StartMainThreadExecution(env));
}

#ifdef __POSIX__
typedef void (*sigaction_cb)(int signo, siginfo_t* info, void* ucontext);
#endif
Expand Down
Loading

0 comments on commit a9fb51f

Please sign in to comment.