diff --git a/base/exports.jl b/base/exports.jl index f7b59d768ccdc..44b01ba00c83d 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1000,6 +1000,7 @@ export bind, connect, close, + isopen, countlines, readcsv, writecsv, diff --git a/base/stream.jl b/base/stream.jl index 8dfa8600545f5..1a4179bc3ade8 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -558,10 +558,16 @@ function link_pipe(read_end::Ptr{Void},readable_julia_only::Bool,write_end::Name end close_pipe_sync(handle::UVHandle) = ccall(:uv_pipe_close_sync,Void,(UVHandle,),handle) +function isopen(stream::AsyncStream) + stream.open +end + +_uv_hook_isopen(stream::AsyncStream) = int32(isopen(stream)) + function close(stream::AsyncStream) if stream.open - stream.open = false ccall(:jl_close_uv,Void,(Ptr{Void},),stream.handle) + stream.open = false end end diff --git a/src/init.c b/src/init.c index f08103b65ff2a..31590e96517d1 100644 --- a/src/init.c +++ b/src/init.c @@ -246,12 +246,7 @@ void sigint_handler(int sig, siginfo_t *info, void *context) struct uv_shutdown_queue_item { uv_handle_t *h; struct uv_shutdown_queue_item *next; }; struct uv_shutdown_queue { struct uv_shutdown_queue_item *first; struct uv_shutdown_queue_item *last; }; -static void jl_shutdown_uv_cb(uv_shutdown_t* req, int status) -{ - //if (status == 0) - jl_close_uv((uv_handle_t*)req->handle); - free(req); -} + static void jl_uv_exitcleanup_add(uv_handle_t* handle, struct uv_shutdown_queue *queue) { struct uv_shutdown_queue_item *item = malloc(sizeof(struct uv_shutdown_queue_item)); @@ -300,17 +295,8 @@ DLLEXPORT void uv_atexit_hook() //#endif case UV_TCP: case UV_NAMED_PIPE: - if (uv_is_writable((uv_stream_t*)handle)) { // uv_shutdown returns an error if not writable - uv_shutdown_t *req = malloc(sizeof(uv_shutdown_t)); - int err = uv_shutdown(req, (uv_stream_t*)handle, jl_shutdown_uv_cb); - if (err != 0) { - printf("shutdown err: %s\n", uv_strerror(uv_last_error(jl_global_event_loop()))); - jl_close_uv(handle); - } - } - else { - jl_close_uv(handle); - } + // These will be shut down in jl_close_uv. + jl_close_uv(handle); break; //Don't close these directly, but rather let the GC take care of it case UV_POLL: diff --git a/src/jl_uv.c b/src/jl_uv.c index dcd09b8cbc63d..6b5f742bf3010 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -49,6 +49,7 @@ extern "C" { XX(getaddrinfo) \ XX(pollcb) \ XX(fspollcb) \ + XX(isopen) \ XX(fseventscb) //TODO add UDP and other missing callbacks @@ -69,6 +70,7 @@ DLLEXPORT void jl_get_uv_hooks() int base_module_conflict = 0; //set to 1 if Base is getting redefined since it means there are two place to try the callbacks // warning: this is defined without the standard do {...} while (0) wrapper, since I wanted ret to escape // warning: during bootstrapping, callbacks will be called twice if a MethodError occured at ANY time during callback call +// Use: JULIA_CB(hook, arg1, numberOfAdditionalArgs, arg2Type, arg2, ..., argNType, argN) #define JULIA_CB(hook,args...) \ jl_value_t *ret; \ if (!base_module_conflict) { \ @@ -131,6 +133,12 @@ void closeHandle(uv_handle_t* handle) free(handle); } +void shutdownCallback(uv_shutdown_t* req, int status) +{ + uv_close((uv_handle_t*) req->handle,&closeHandle); + free(req); +} + void jl_return_spawn(uv_process_t *p, int exit_status, int term_signal) { JULIA_CB(return_spawn,p->data,2,CB_INT32,exit_status,CB_INT32,term_signal); @@ -286,11 +294,32 @@ DLLEXPORT uv_pipe_t *jl_init_pipe(uv_pipe_t *pipe, int writable, int julia_only, DLLEXPORT void jl_close_uv(uv_handle_t *handle) { - if (!handle) + if (!handle || uv_is_closing(handle)) return; if (handle->type==UV_TTY) uv_tty_set_mode((uv_tty_t*)handle,0); - uv_close(handle,&closeHandle); + + if ( (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP) && uv_is_writable( (uv_stream_t *) handle)) { + // Make sure that the stream has not already been marked closed in Julia. + // A double shutdown would cause the process to hang on exit. + JULIA_CB(isopen, handle->data, 0); + if (!jl_is_int32(ret)) { + jl_error("jl_close_uv: _uv_hook_isopen must return an int32."); + } + if (!jl_unbox_int32(ret)){ + return; + } + + uv_shutdown_t *req = malloc(sizeof(uv_shutdown_t)); + int err = uv_shutdown(req, (uv_stream_t*)handle, &shutdownCallback); + if (err != 0) { + printf("shutdown err: %s\n", uv_strerror(uv_last_error(jl_global_event_loop()))); + uv_close(handle, &closeHandle); + } + } + else { + uv_close(handle,&closeHandle); + } } DLLEXPORT void jl_uv_associate_julia_struct(uv_handle_t *handle, jl_value_t *data) diff --git a/test/spawn.jl b/test/spawn.jl index f7c9f10ffd8a7..9a6646af8e564 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -41,3 +41,20 @@ if false @test success(ignorestatus(`false` | `false`)) @test success(ignorestatus(`false` & `false`)) end + +# Here we test that if we close a stream with pending writes, we don't lose the writes. +str = "" +for i=1:1000 + str = "$str\n $(randstring(10))" +end +stdout, stdin, proc = readandwrite(`cat -`) +write(stdin, str) +close(stdin) +str2 = readall(stdout) +@test str2 == str + +# This test hangs if the end of run walk across uv streams calls shutdown on a stream that is shutting down. +file = tempname() +stdout, stdin, proc = readandwrite(`cat -` > file) +write(stdin, str) +close(stdin)