Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Reland "IO: tie lifetime of handle field to container"" #45056

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions base/asyncevent.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ Use [`isopen`](@ref) to check whether it is still active.
This provides an implicit acquire & release memory ordering between the sending and waiting threads.
"""
mutable struct AsyncCondition
@atomic handle::Ptr{Cvoid}
handle::Ptr{Cvoid}
cond::ThreadSynchronizer
@atomic isopen::Bool
isopen::Bool
@atomic set::Bool

function AsyncCondition()
Expand Down Expand Up @@ -86,9 +86,9 @@ once. When the timer is closed (by [`close`](@ref)) waiting tasks are woken with

"""
mutable struct Timer
@atomic handle::Ptr{Cvoid}
handle::Ptr{Cvoid}
cond::ThreadSynchronizer
@atomic isopen::Bool
isopen::Bool
@atomic set::Bool

function Timer(timeout::Real; interval::Real = 0.0)
Expand Down Expand Up @@ -157,12 +157,12 @@ function wait(t::Union{Timer, AsyncCondition})
end


isopen(t::Union{Timer, AsyncCondition}) = t.isopen && t.handle != C_NULL
isopen(t::Union{Timer, AsyncCondition}) = t.isopen

function close(t::Union{Timer, AsyncCondition})
iolock_begin()
if isopen(t)
@atomic :monotonic t.isopen = false
if t.handle != C_NULL && isopen(t)
t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
iolock_end()
Expand All @@ -174,12 +174,12 @@ function uvfinalize(t::Union{Timer, AsyncCondition})
lock(t.cond)
try
if t.handle != C_NULL
disassociate_julia_struct(t.handle) # not going to call the usual close hooks anymore
disassociate_julia_struct(t.handle) # not going to call the usual close hooks
if t.isopen
@atomic :monotonic t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t.handle)
t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
@atomic :monotonic t.handle = C_NULL
t.handle = C_NULL
notify(t.cond, false)
end
finally
Expand All @@ -192,9 +192,9 @@ end
function _uv_hook_close(t::Union{Timer, AsyncCondition})
lock(t.cond)
try
@atomic :monotonic t.isopen = false
Libc.free(@atomicswap :monotonic t.handle = C_NULL)
notify(t.cond, false)
t.isopen = false
t.handle = C_NULL
notify(t.cond, t.set)
finally
unlock(t.cond)
end
Expand Down
7 changes: 2 additions & 5 deletions base/libuv.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,8 @@ function preserve_handle(x)
end
function unpreserve_handle(x)
lock(preserve_handle_lock)
v = get(uvhandles, x, 0)::Int
if v == 0
unlock(preserve_handle_lock)
error("unbalanced call to unpreserve_handle for $(typeof(x))")
elseif v == 1
v = uvhandles[x]::Int
if v == 1
pop!(uvhandles, x)
else
uvhandles[x] = v - 1
Expand Down
8 changes: 4 additions & 4 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ function uv_return_spawn(p::Ptr{Cvoid}, exit_status::Int64, termsignal::Int32)
proc = unsafe_pointer_to_objref(data)::Process
proc.exitcode = exit_status
proc.termsignal = termsignal
disassociate_julia_struct(proc.handle) # ensure that data field is set to C_NULL
disassociate_julia_struct(proc) # ensure that data field is set to C_NULL
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), proc.handle)
proc.handle = C_NULL
lock(proc.exitnotify)
Expand All @@ -70,7 +70,7 @@ end

# called when the libuv handle is destroyed
function _uv_hook_close(proc::Process)
Libc.free(@atomicswap :not_atomic proc.handle = C_NULL)
proc.handle = C_NULL
nothing
end

Expand Down Expand Up @@ -607,10 +607,10 @@ Get the child process ID, if it still exists.
This function requires at least Julia 1.1.
"""
function Libc.getpid(p::Process)
# TODO: due to threading, this method is only weakly synchronized with the user application
# TODO: due to threading, this method is no longer synchronized with the user application
iolock_begin()
ppid = Int32(0)
if p.handle != C_NULL # e.g. process_running
if p.handle != C_NULL
ppid = ccall(:jl_uv_process_pid, Int32, (Ptr{Cvoid},), p.handle)
end
iolock_end()
Expand Down
24 changes: 11 additions & 13 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ if OS_HANDLE != RawFD
end

function isopen(x::Union{LibuvStream, LibuvServer})
if x.status == StatusUninit || x.status == StatusInit || x.handle === C_NULL
if x.status == StatusUninit || x.status == StatusInit
throw(ArgumentError("$x is not initialized"))
end
return x.status != StatusClosed
Expand Down Expand Up @@ -496,37 +496,34 @@ end

function close(stream::Union{LibuvStream, LibuvServer})
iolock_begin()
should_wait = false
if stream.status == StatusInit
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
elseif isopen(stream)
should_wait = uv_handle_data(stream) != C_NULL
if stream.status != StatusClosing
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
end
iolock_end()
wait_close(stream)
should_wait && wait_close(stream)
nothing
end

function uvfinalize(uv::Union{LibuvStream, LibuvServer})
uv.handle == C_NULL && return
iolock_begin()
if uv.handle != C_NULL
disassociate_julia_struct(uv.handle) # not going to call the usual close hooks (so preserve_handle is not needed)
if uv.status == StatusUninit
Libc.free(uv.handle)
elseif uv.status == StatusInit
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
elseif isopen(uv)
if uv.status != StatusClosing
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
end
elseif uv.status == StatusClosed
disassociate_julia_struct(uv.handle) # not going to call the usual close hooks
if uv.status != StatusUninit
close(uv)
else
Libc.free(uv.handle)
end
uv.handle = C_NULL
uv.status = StatusClosed
uv.handle = C_NULL
end
iolock_end()
nothing
Expand Down Expand Up @@ -716,6 +713,7 @@ end
function _uv_hook_close(uv::Union{LibuvStream, LibuvServer})
lock(uv.cond)
try
uv.handle = C_NULL
uv.status = StatusClosed
# notify any listeners that exist on this libuv stream type
notify(uv.cond)
Expand Down
3 changes: 2 additions & 1 deletion src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ static void jl_close_item_atexit(uv_handle_t *handle)
switch(handle->type) {
case UV_PROCESS:
// cause Julia to forget about the Process object
handle->data = NULL;
if (handle->data)
jl_uv_call_close_callback((jl_value_t*)handle->data);
// and make libuv think it is already dead
((uv_process_t*)handle)->pid = 0;
// fall-through
Expand Down
54 changes: 27 additions & 27 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,14 @@ JL_DLLEXPORT void jl_iolock_end(void)
}


static void jl_uv_call_close_callback(jl_value_t *val)
void jl_uv_call_close_callback(jl_value_t *val)
{
jl_value_t **args;
JL_GC_PUSHARGS(args, 2); // val is "rooted" in the finalizer list only right now
jl_value_t *args[2];
args[0] = jl_get_global(jl_base_relative_to(((jl_datatype_t*)jl_typeof(val))->name->module),
jl_symbol("_uv_hook_close")); // topmod(typeof(val))._uv_hook_close
args[1] = val;
assert(args[0]);
jl_apply(args, 2); // TODO: wrap in try-catch?
JL_GC_POP();
}

static void jl_uv_closeHandle(uv_handle_t *handle)
Expand All @@ -107,7 +105,6 @@ static void jl_uv_closeHandle(uv_handle_t *handle)
ct->world_age = jl_atomic_load_acquire(&jl_world_counter);
jl_uv_call_close_callback((jl_value_t*)handle->data);
ct->world_age = last_age;
return;
}
if (handle == (uv_handle_t*)&signal_async)
return;
Expand All @@ -128,10 +125,6 @@ static void jl_uv_flush_close_callback(uv_write_t *req, int status)
free(req);
return;
}
if (uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream
free(req);
return;
}
if (status == 0 && uv_is_writable(stream) && stream->write_queue_size != 0) {
// new data was written, wait for it to flush too
uv_buf_t buf;
Expand All @@ -141,10 +134,12 @@ static void jl_uv_flush_close_callback(uv_write_t *req, int status)
if (uv_write(req, stream, &buf, 1, (uv_write_cb)jl_uv_flush_close_callback) == 0)
return; // success
}
if (!uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream
if (stream->type == UV_TTY)
uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL);
uv_close((uv_handle_t*)stream, &jl_uv_closeHandle);
}
free(req);
if (stream->type == UV_TTY)
uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL);
uv_close((uv_handle_t*)stream, &jl_uv_closeHandle);
}

static void uv_flush_callback(uv_write_t *req, int status)
Expand Down Expand Up @@ -229,42 +224,47 @@ static void jl_proc_exit_cleanup_cb(uv_process_t *process, int64_t exit_status,

JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle)
{
JL_UV_LOCK();
if (handle->type == UV_PROCESS && ((uv_process_t*)handle)->pid != 0) {
// take ownership of this handle,
// so we can waitpid for the resource to exit and avoid leaving zombies
assert(handle->data == NULL); // make sure Julia has forgotten about it already
((uv_process_t*)handle)->exit_cb = jl_proc_exit_cleanup_cb;
uv_unref(handle);
return;
}
else if (handle->type == UV_FILE) {
JL_UV_LOCK();
if (handle->type == UV_FILE) {
uv_fs_t req;
jl_uv_file_t *fd = (jl_uv_file_t*)handle;
if ((ssize_t)fd->file != -1) {
uv_fs_close(handle->loop, &req, fd->file, NULL);
fd->file = (uv_os_fd_t)(ssize_t)-1;
}
jl_uv_closeHandle(handle); // synchronous (ok since the callback is known to not interact with any global state)
JL_UV_UNLOCK();
return;
}
else if (!uv_is_closing(handle)) { // avoid double-closing the stream
if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) {
// flush the stream write-queue first
uv_write_t *req = (uv_write_t*)malloc_s(sizeof(uv_write_t));
req->handle = (uv_stream_t*)handle;
jl_uv_flush_close_callback(req, 0);
}
else {
uv_close(handle, &jl_uv_closeHandle);
}

if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) {
uv_write_t *req = (uv_write_t*)malloc_s(sizeof(uv_write_t));
req->handle = (uv_stream_t*)handle;
jl_uv_flush_close_callback(req, 0);
JL_UV_UNLOCK();
return;
}

// avoid double-closing the stream
if (!uv_is_closing(handle)) {
uv_close(handle, &jl_uv_closeHandle);
}
JL_UV_UNLOCK();
}

JL_DLLEXPORT void jl_forceclose_uv(uv_handle_t *handle)
{
if (!uv_is_closing(handle)) { // avoid double-closing the stream
// avoid double-closing the stream
if (!uv_is_closing(handle)) {
JL_UV_LOCK();
if (!uv_is_closing(handle)) { // double-check
if (!uv_is_closing(handle)) {
uv_close(handle, &jl_uv_closeHandle);
}
JL_UV_UNLOCK();
Expand Down
1 change: 1 addition & 0 deletions src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ JL_DLLEXPORT jl_fptr_args_t jl_get_builtin_fptr(jl_value_t *b);

extern uv_loop_t *jl_io_loop;
void jl_uv_flush(uv_stream_t *stream);
void jl_uv_call_close_callback(jl_value_t *val);

typedef struct jl_typeenv_t {
jl_tvar_t *var;
Expand Down
Loading