Skip to content

Commit

Permalink
Merge pull request #3084 from rsofaer/uv_shutdown_in_julia_close
Browse files Browse the repository at this point in the history
RFC: uv_shutdown in julia close
  • Loading branch information
Keno committed May 13, 2013
2 parents c8cd71d + 4369e52 commit 6da4a71
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 20 deletions.
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,7 @@ export
bind,
connect,
close,
isopen,
countlines,
readcsv,
writecsv,
Expand Down
8 changes: 7 additions & 1 deletion base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -558,10 +558,16 @@ function link_pipe(read_end::Ptr{Void},readable_julia_only::Bool,write_end::Name
end
close_pipe_sync(handle::UVHandle) = ccall(:uv_pipe_close_sync,Void,(UVHandle,),handle)

function isopen(stream::AsyncStream)
stream.open
end

_uv_hook_isopen(stream::AsyncStream) = int32(isopen(stream))

function close(stream::AsyncStream)
if stream.open
stream.open = false
ccall(:jl_close_uv,Void,(Ptr{Void},),stream.handle)
stream.open = false
end
end

Expand Down
20 changes: 3 additions & 17 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,7 @@ void sigint_handler(int sig, siginfo_t *info, void *context)

struct uv_shutdown_queue_item { uv_handle_t *h; struct uv_shutdown_queue_item *next; };
struct uv_shutdown_queue { struct uv_shutdown_queue_item *first; struct uv_shutdown_queue_item *last; };
static void jl_shutdown_uv_cb(uv_shutdown_t* req, int status)
{
//if (status == 0)
jl_close_uv((uv_handle_t*)req->handle);
free(req);
}

static void jl_uv_exitcleanup_add(uv_handle_t* handle, struct uv_shutdown_queue *queue)
{
struct uv_shutdown_queue_item *item = malloc(sizeof(struct uv_shutdown_queue_item));
Expand Down Expand Up @@ -300,17 +295,8 @@ DLLEXPORT void uv_atexit_hook()
//#endif
case UV_TCP:
case UV_NAMED_PIPE:
if (uv_is_writable((uv_stream_t*)handle)) { // uv_shutdown returns an error if not writable
uv_shutdown_t *req = malloc(sizeof(uv_shutdown_t));
int err = uv_shutdown(req, (uv_stream_t*)handle, jl_shutdown_uv_cb);
if (err != 0) {
printf("shutdown err: %s\n", uv_strerror(uv_last_error(jl_global_event_loop())));
jl_close_uv(handle);
}
}
else {
jl_close_uv(handle);
}
// These will be shut down in jl_close_uv.
jl_close_uv(handle);
break;
//Don't close these directly, but rather let the GC take care of it
case UV_POLL:
Expand Down
33 changes: 31 additions & 2 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ extern "C" {
XX(getaddrinfo) \
XX(pollcb) \
XX(fspollcb) \
XX(isopen) \
XX(fseventscb)
//TODO add UDP and other missing callbacks

Expand All @@ -69,6 +70,7 @@ DLLEXPORT void jl_get_uv_hooks()
int base_module_conflict = 0; //set to 1 if Base is getting redefined since it means there are two place to try the callbacks
// warning: this is defined without the standard do {...} while (0) wrapper, since I wanted ret to escape
// warning: during bootstrapping, callbacks will be called twice if a MethodError occured at ANY time during callback call
// Use: JULIA_CB(hook, arg1, numberOfAdditionalArgs, arg2Type, arg2, ..., argNType, argN)
#define JULIA_CB(hook,args...) \
jl_value_t *ret; \
if (!base_module_conflict) { \
Expand Down Expand Up @@ -131,6 +133,12 @@ void closeHandle(uv_handle_t* handle)
free(handle);
}

void shutdownCallback(uv_shutdown_t* req, int status)
{
uv_close((uv_handle_t*) req->handle,&closeHandle);
free(req);
}

void jl_return_spawn(uv_process_t *p, int exit_status, int term_signal)
{
JULIA_CB(return_spawn,p->data,2,CB_INT32,exit_status,CB_INT32,term_signal);
Expand Down Expand Up @@ -286,11 +294,32 @@ DLLEXPORT uv_pipe_t *jl_init_pipe(uv_pipe_t *pipe, int writable, int julia_only,

DLLEXPORT void jl_close_uv(uv_handle_t *handle)
{
if (!handle)
if (!handle || uv_is_closing(handle))
return;
if (handle->type==UV_TTY)
uv_tty_set_mode((uv_tty_t*)handle,0);
uv_close(handle,&closeHandle);

if ( (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP) && uv_is_writable( (uv_stream_t *) handle)) {
// Make sure that the stream has not already been marked closed in Julia.
// A double shutdown would cause the process to hang on exit.
JULIA_CB(isopen, handle->data, 0);
if (!jl_is_int32(ret)) {
jl_error("jl_close_uv: _uv_hook_isopen must return an int32.");
}
if (!jl_unbox_int32(ret)){
return;
}

uv_shutdown_t *req = malloc(sizeof(uv_shutdown_t));
int err = uv_shutdown(req, (uv_stream_t*)handle, &shutdownCallback);
if (err != 0) {
printf("shutdown err: %s\n", uv_strerror(uv_last_error(jl_global_event_loop())));
uv_close(handle, &closeHandle);
}
}
else {
uv_close(handle,&closeHandle);
}
}

DLLEXPORT void jl_uv_associate_julia_struct(uv_handle_t *handle, jl_value_t *data)
Expand Down
17 changes: 17 additions & 0 deletions test/spawn.jl
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,20 @@ if false
@test success(ignorestatus(`false` | `false`))
@test success(ignorestatus(`false` & `false`))
end

# Here we test that if we close a stream with pending writes, we don't lose the writes.
str = ""
for i=1:1000
str = "$str\n $(randstring(10))"
end
stdout, stdin, proc = readandwrite(`cat -`)
write(stdin, str)
close(stdin)
str2 = readall(stdout)
@test str2 == str

# This test hangs if the end of run walk across uv streams calls shutdown on a stream that is shutting down.
file = tempname()
stdout, stdin, proc = readandwrite(`cat -` > file)
write(stdin, str)
close(stdin)

0 comments on commit 6da4a71

Please sign in to comment.