diff --git a/source/libasync/dns.d b/source/libasync/dns.d index 4711605..f5e89c9 100644 --- a/source/libasync/dns.d +++ b/source/libasync/dns.d @@ -45,6 +45,7 @@ public: assert(false, "Failed to start DNS Signaling"); } m_cmdInfo.ready.run(cast(void delegate())&callback); + m_owner = cast(shared)Thread.getThis(); try m_cmdInfo.mtx = cast(shared) new Mutex; catch (Exception) {} } @@ -82,7 +83,8 @@ public: do { static if (LOG) .tracef("Resolving url: %s", url); static if (is_Windows || EPOLL) { - if (force_async) { + if (force_async) { + tracef("Resolving async with signal fd: %X", m_cmdInfo.ready.id); m_cmdInfo.command = DNSCmd.RESOLVEHOST; m_cmdInfo.ipv6 = ipv6; m_cmdInfo.url = cast(shared) url; @@ -176,6 +178,15 @@ package struct AsyncDNSRequest version(Windows) { import libasync.internals.win32; PADDRINFOEX infos; + AsyncOverlapped* overlapped; + ~this() { + try { + AsyncOverlapped.free(overlapped); + } catch (Exception e) { + import libasync.internals.logging; + static if (LOG) tracef("Exception freeing in AsyncDNSRequest: %s", e.toString()); + } + } } static if (EPOLL) { import libasync.internals.socket_compat : gaicb, sigevent, addrinfo; diff --git a/source/libasync/windows.d b/source/libasync/windows.d index 55ffcd2..32c2006 100644 --- a/source/libasync/windows.d +++ b/source/libasync/windows.d @@ -125,7 +125,7 @@ package: RegisterClassW(&wc); m_hwnd = CreateWindowW(wnz, clsn, 0, 0, 0, 385, 375, HWND_MESSAGE, cast(HMENU) null, null, null); - static if (LOG) try log("Window registered: " ~ m_hwnd.to!string); catch (Throwable) {} + static if (LOG) try trace("Window registered: " ~ m_hwnd.to!string); catch (Throwable) {} auto ptr = cast(ULONG_PTR)cast(void*)&this; SetWindowLongPtrA(m_hwnd, GWLP_USERDATA, ptr); assert( cast(EventLoopImpl*)cast(void*)GetWindowLongPtrA(m_hwnd, GWLP_USERDATA) is &this ); @@ -294,7 +294,7 @@ package: } if (catchErrors!"MsgWaitForMultipleObjectsEx"(signal, errors)) { - static if (LOG) log("Event Loop Exiting because of error"); + static if (LOG) trace("Event Loop Exiting because of error"); return false; } @@ -307,7 +307,7 @@ package: DispatchMessageW(&msg); if (m_status.code == Status.ERROR) { - static if (LOG) log(m_status.text); + static if (LOG) trace(m_status.text); return false; } } @@ -442,7 +442,7 @@ package: if (initTCPListener(fd, ctxt, reusing)) { try { - static if (LOG) log("Running listener on socket fd#" ~ fd.to!string); + static if (LOG) trace("Running listener on socket fd#" ~ fd.to!string); m_connHandlers[fd] = del; version(Distributed)ctxt.init(m_hwnd, fd); } @@ -472,7 +472,7 @@ package: if (fd == fd_t.init) fd = WSASocketW(cast(int)ctxt.peer.family, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED); - static if (LOG) log("Starting connection at: " ~ fd.to!string); + static if (LOG) trace("Starting connection at: " ~ fd.to!string); if (catchSocketError!("run AsyncTCPConnection")(fd, INVALID_SOCKET)) return 0; @@ -487,7 +487,7 @@ package: nothrow void closeAll() { try { - static if (LOG) log("Remove event handler for " ~ fd.to!string); + static if (LOG) trace("Remove event handler for " ~ fd.to!string); m_tcpHandlers.remove(fd); } catch (Exception e) { @@ -509,7 +509,7 @@ package: } - static if (LOG) try log("Client started FD#" ~ fd.to!string); + static if (LOG) try trace("Client started FD#" ~ fd.to!string); catch (Throwable) {} return fd; } @@ -537,7 +537,7 @@ package: } else return 0; - static if (LOG) try log("UDP Socket started FD#" ~ fd.to!string); + static if (LOG) try trace("UDP Socket started FD#" ~ fd.to!string); catch (Throwable) {} return fd; @@ -545,13 +545,13 @@ package: fd_t run(shared AsyncSignal ctxt) { m_status = StatusInfo.init; - static if (LOG) try log("Signal subscribed to: " ~ m_hwnd.to!string); catch (Throwable) {} + static if (LOG) try trace("Signal subscribed to: " ~ m_hwnd.to!string); catch (Throwable) {} return (cast(fd_t)m_hwnd); } fd_t run(AsyncNotifier ctxt) { m_status = StatusInfo.init; - //static if (LOG) try log("Running signal " ~ (cast(AsyncNotifier)ctxt).to!string); catch (Throwable) {} + //static if (LOG) try trace("Running signal " ~ (cast(AsyncNotifier)ctxt).to!string); catch (Throwable) {} return cast(fd_t) m_hwnd; } @@ -563,7 +563,7 @@ package: if (timer_id == fd_t.init) { timer_id = createIndex(); } - static if (LOG) try log("Timer created: " ~ timer_id.to!string ~ " with timeout: " ~ timeout.total!"msecs".to!string ~ " msecs"); catch (Throwable) {} + static if (LOG) try trace("Timer created: " ~ timer_id.to!string ~ " with timeout: " ~ timeout.total!"msecs".to!string ~ " msecs"); catch (Throwable) {} BOOL err; try err = cast(int)SetTimer(m_hwnd, timer_id, timeout.total!"msecs".to!uint, null); @@ -646,7 +646,7 @@ package: m_status = StatusInfo.init; fd_t fd = ctxt.socket; - static if (LOG) log("Killing socket "~ fd.to!string); + static if (LOG) trace("Killing socket "~ fd.to!string); try { auto cb = m_tcpHandlers.get(ctxt.socket); if (cb != TCPEventHandler.init){ @@ -690,7 +690,7 @@ package: bool kill(AsyncTimer ctxt) { m_status = StatusInfo.init; - static if (LOG) try log("Kill timer" ~ ctxt.id.to!string); catch (Throwable) {} + static if (LOG) try trace("Kill timer" ~ ctxt.id.to!string); catch (Throwable) {} BOOL err = KillTimer(m_hwnd, ctxt.id); if (err == 0) @@ -953,7 +953,7 @@ package: import std.algorithm : min; size_t cnt = min(dst.length, changes.length); foreach (DWChangeInfo change; (*changes)[0 .. cnt]) { - static if (LOG) try log("reading change: " ~ change.path); catch (Throwable) {} + static if (LOG) try trace("reading change: " ~ change.path); catch (Throwable) {} dst[i] = (*changes)[i]; changes.removeFront(); i++; @@ -963,7 +963,7 @@ package: setInternalError!"watcher.readChanges"(Status.ERROR, "Could not read directory changes: " ~ e.msg); return 0; } - static if (LOG) try log("Changes returning with: " ~ i.to!string); catch (Throwable) {} + static if (LOG) try trace("Changes returning with: " ~ i.to!string); catch (Throwable) {} return cast(uint) i; } @@ -981,7 +981,7 @@ package: wd = cast(uint) hndl; DWHandlerInfo handler = m_dwHandlers.get(fd, DWHandlerInfo.init); assert(handler !is null); - static if (LOG) log("Watching: " ~ info.path.toNativeString()); + static if (LOG) trace("Watching: " ~ info.path.toNativeString()); auto dwWatcher = ThreadMem.alloc!DWFolderWatcher(m_evLoop, fd, hndl, info.path, info.events, handler, info.recursive); (m_dwFolders)[wd] = dwWatcher; @@ -1025,7 +1025,7 @@ package: err = PostMessageA(cast(HWND)fd, WM_USER_SIGNAL, wparam, lparam); else err = PostMessageA(cast(HWND)fd, WM_USER_EVENT, wparam, lparam); - static if (LOG) try log("Sending notification to: " ~ (cast(HWND)fd).to!string); catch (Throwable) {} + static if (LOG) try trace("Sending notification to: " ~ (cast(HWND)fd).to!string); catch (Throwable) {} if (err == 0) { m_error = GetLastErrorSafe(); @@ -1510,7 +1510,7 @@ package: m_status = StatusInfo.init; int ret = .recv(fd, cast(void*) data.ptr, cast(INT) data.length, 0); - //static if (LOG) try log("RECV " ~ ret.to!string ~ "B FD#" ~ fd.to!string); catch (Throwable) {} + //static if (LOG) try trace("RECV " ~ ret.to!string ~ "B FD#" ~ fd.to!string); catch (Throwable) {} if (catchSocketError!".recv"(ret)) { // ret == -1 if (m_error == error_t.WSAEWOULDBLOCK) m_status.code = Status.ASYNC; @@ -1526,7 +1526,7 @@ package: uint send(in fd_t fd, in void[] data) { m_status = StatusInfo.init; - static if (LOG) try log("SEND " ~ data.length.to!string ~ "B FD#" ~ fd.to!string); + static if (LOG) try trace("SEND " ~ data.length.to!string ~ "B FD#" ~ fd.to!string); catch (Throwable) {} int ret = .send(fd, cast(const(void)*) data.ptr, cast(INT) data.length, 0); @@ -1564,7 +1564,7 @@ package: addr.family = AF_INET; } - static if (LOG) try log("RECVFROM " ~ ret.to!string ~ "B"); catch (Throwable) {} + static if (LOG) try trace("RECVFROM " ~ ret.to!string ~ "B"); catch (Throwable) {} if (catchSocketError!".recvfrom"(ret)) { // ret == -1 if (m_error == WSAEWOULDBLOCK) m_status.code = Status.ASYNC; @@ -1580,7 +1580,7 @@ package: uint sendTo(in fd_t fd, in void[] data, in NetworkAddress addr) { m_status = StatusInfo.init; - static if (LOG) try log("SENDTO " ~ data.length.to!string ~ "B " ~ addr.toString()); catch (Throwable) {} + static if (LOG) try trace("SENDTO " ~ data.length.to!string ~ "B " ~ addr.toString()); catch (Throwable) {} int ret; if (addr != NetworkAddress.init) ret = .sendto(fd, cast(void*) data.ptr, cast(INT) data.length, 0, addr.sockAddr, addr.sockAddrLen); @@ -1624,7 +1624,7 @@ package: INT err; - static if (LOG) try log("Shutdown FD#" ~ fd.to!string); + static if (LOG) try trace("Shutdown FD#" ~ fd.to!string); catch (Throwable) {} if (forced) { err = shutdown(fd, SD_BOTH); @@ -1642,7 +1642,7 @@ package: } evh.conn = null; - //static if (LOG) log("Remove event handler for " ~ fd.to!string); + //static if (LOG) trace("Remove event handler for " ~ fd.to!string); m_tcpHandlers.remove(fd); } } @@ -1662,7 +1662,7 @@ package: if (!connected && forced) { try { if (fd in m_connHandlers) { - static if (LOG) log("Removing connection handler for: " ~ fd.to!string); + static if (LOG) trace("Removing connection handler for: " ~ fd.to!string); m_connHandlers.remove(fd); } } @@ -1715,7 +1715,7 @@ package: INT err = WSAStringToAddressW(str, cast(INT) addr.family, null, addr.sockAddr, &addrlen); if (port != 0) addr.port = port; - static if (LOG) try log(addr.toString()); + static if (LOG) try trace(addr.toString()); catch (Throwable) {} if( catchSocketError!"getAddressFromIP"(err) ) return NetworkAddress.init; @@ -1729,10 +1729,11 @@ package: debug assert(validateHost(host), "Trying to connect to an invalid domain"); } do */{ - trace("resolve"); + static if (LOG) trace("resolve()"); m_status = StatusInfo.init; auto overlapped = assumeWontThrow(AsyncOverlapped.alloc()); overlapped.resolve = ctx; + ctx.overlapped = overlapped; ADDRINFOEXW hints; PADDRINFOEX infos; hints.ai_flags |= AI_CANONNAME; @@ -1764,13 +1765,13 @@ package: return false; } timeval timeout = timeval(5,0); - trace("Getting addr info"); + static if (LOG) trace("Getting addr info"); error_t err = cast(error_t) GetAddrInfoExW(str, wPort, NS_DNS, NULL, &hints, &overlapped.resolve.infos, &timeout, cast(WSAOVERLAPPEDX*) overlapped, cast(LPLOOKUPSERVICE_COMPLETION_ROUTINE) &onOverlappedResolveComplete, NULL); if (err != WSA_IO_PENDING) { - setInternalError!"GetAddrInfoW"(Status.ABORT, string.init, err); + setInternalError!"GetAddrInfoExW"(Status.ABORT, string.init, err); onOverlappedResolveComplete(cast(INT)err, 0, overlapped); return false; } @@ -1831,7 +1832,7 @@ package: data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy addr.family = cast(ushort)infos.ai_family; - static if (LOG) try log("GetAddrInfoW Successfully resolved DNS to: " ~ addr.toAddressString()); + static if (LOG) try trace("GetAddrInfoW Successfully resolved DNS to: " ~ addr.toAddressString()); catch (Exception e){} return addr; } @@ -1887,7 +1888,7 @@ private: } break; case WM_TIMER: - static if (LOG) try log("Timer callback: " ~ m_timer.fd.to!string); catch (Throwable) {} + static if (LOG) try trace("Timer callback: " ~ m_timer.fd.to!string); catch (Throwable) {} TimerHandler cb; bool cached = (m_timer.fd == cast(fd_t)msg.wParam); try { @@ -1909,7 +1910,7 @@ private: break; case WM_USER_EVENT: - static if (LOG) log("User event"); + static if (LOG) trace("User event"); ulong uwParam = cast(ulong)msg.wParam; ulong ulParam = cast(ulong)msg.lParam; @@ -1918,7 +1919,7 @@ private: void* payloadPtr = cast(void*) payloadAddr; shared AsyncSignal ctxt = cast(shared AsyncSignal) payloadPtr; - static if (LOG) try log("Got notification in : " ~ m_hwnd.to!string ~ " pointer: " ~ payloadPtr.to!string); catch (Throwable) {} + static if (LOG) try trace("Got notification in : " ~ m_hwnd.to!string ~ " pointer: " ~ payloadPtr.to!string); catch (Throwable) {} try { assert(ctxt.id != 0); ctxt.handler(); @@ -1928,7 +1929,7 @@ private: } break; case WM_USER_SIGNAL: - static if (LOG) log("User signal"); + static if (LOG) trace("User signal"); ulong uwParam = cast(ulong)msg.wParam; ulong ulParam = cast(ulong)msg.lParam; @@ -1974,7 +1975,7 @@ private: default: break; case FD_READ: try { - static if (LOG) log("READ FD#" ~ sock.to!string); + static if (LOG) trace("READ FD#" ~ sock.to!string); cb = m_udpHandlers.get(sock); assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback"); cb(UDPEvent.READ); @@ -1986,7 +1987,7 @@ private: break; case FD_WRITE: try { - static if (LOG) log("WRITE FD#" ~ sock.to!string); + static if (LOG) trace("WRITE FD#" ~ sock.to!string); cb = m_udpHandlers.get(sock); assert(cb != UDPHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback"); cb(UDPEvent.WRITE); @@ -2013,7 +2014,7 @@ private: if (err) { setInternalError!"onTCPEvent"(Status.ERROR, string.init, cast(error_t)err); try { - //log("CLOSE FD#" ~ sock.to!string); + //trace("CLOSE FD#" ~ sock.to!string); (m_tcpHandlers)[sock](TCPEvent.ERROR); } catch (Throwable) { // can't do anything about this... } @@ -2026,8 +2027,8 @@ private: case FD_ACCEPT: version(Distributed) gs_mtx.lock_nothrow(); - static if (LOG) log("TCP Handlers: " ~ m_tcpHandlers.length.to!string); - static if (LOG) log("Accepting connection"); + static if (LOG) trace("TCP Handlers: " ~ m_tcpHandlers.length.to!string); + static if (LOG) trace("Accepting connection"); /// Let another listener take the next connection TCPAcceptHandler list; try list = m_connHandlers[sock]; catch (Throwable) { assert(false, "Listening on an invalid socket..."); } @@ -2046,7 +2047,7 @@ private: assert(false, "Could not set listener back to window HANDLE " ~ m_hwnd.to!string); } } - else static if (LOG) log("Returned init!!"); + else static if (LOG) trace("Returned init!!"); gs_mtx.unlock_nothrow(); } } @@ -2071,7 +2072,7 @@ private: if ( catchSocketError!"WSAAsyncSelect"(ok) ) return false; - static if (LOG) log("Connection accepted: " ~ csock.to!string); + static if (LOG) trace("Connection accepted: " ~ csock.to!string); AsyncTCPConnection conn; try conn = ThreadMem.alloc!AsyncTCPConnection(m_evLoop); @@ -2091,7 +2092,7 @@ private: try { m_tcpHandlers[csock] = cb; // keep the handler to setup the connection - static if (LOG) log("ACCEPT&CONNECT FD#" ~ csock.to!string); + static if (LOG) trace("ACCEPT&CONNECT FD#" ~ csock.to!string); *conn.connected = true; cb(TCPEvent.CONNECT); } @@ -2102,7 +2103,7 @@ private: break; case FD_CONNECT: try { - static if (LOG) log("CONNECT FD#" ~ sock.to!string); + static if (LOG) trace("CONNECT FD#" ~ sock.to!string); cb = m_tcpHandlers.get(sock); if (cb == TCPEventHandler.init) break;//, "Socket " ~ sock.to!string ~ " could not yield a callback"); *cb.conn.connecting = true; @@ -2114,19 +2115,19 @@ private: break; case FD_READ: try { - static if (LOG) log("READ FD#" ~ sock.to!string); + static if (LOG) trace("READ FD#" ~ sock.to!string); cb = m_tcpHandlers.get(sock); if (cb == TCPEventHandler.init) break; //, "Socket " ~ sock.to!string ~ " could not yield a callback"); if (!cb.conn) break; if (*cb.conn.connected == false && *cb.conn.connecting) { - static if (LOG) log("TCPEvent CONNECT FD#" ~ sock.to!string); + static if (LOG) trace("TCPEvent CONNECT FD#" ~ sock.to!string); *cb.conn.connecting = false; *cb.conn.connected = true; cb(TCPEvent.CONNECT); } else { - static if (LOG) log("TCPEvent READ FD#" ~ sock.to!string); + static if (LOG) trace("TCPEvent READ FD#" ~ sock.to!string); cb(TCPEvent.READ); } } @@ -2140,7 +2141,7 @@ private: try { //import std.stdio; - static if (LOG) log("WRITE FD#" ~ sock.to!string); + static if (LOG) trace("WRITE FD#" ~ sock.to!string); cb = m_tcpHandlers.get(sock); if (cb == TCPEventHandler.init) break;//assert(cb != TCPEventHandler.init, "Socket " ~ sock.to!string ~ " could not yield a callback"); if (!cb.conn) break; @@ -2163,7 +2164,7 @@ private: INT ret; bool connected = true; try { - static if (LOG) log("CLOSE FD#" ~ sock.to!string); + static if (LOG) trace("CLOSE FD#" ~ sock.to!string); if (sock in m_tcpHandlers) { cb = m_tcpHandlers.get(sock); if (*cb.conn.connected || *cb.conn.connecting) { @@ -2192,7 +2193,7 @@ private: bool initUDPSocket(fd_t fd, AsyncUDPSocket ctxt) { INT err; - static if (LOG) log("Binding to UDP " ~ ctxt.local.toString()); + static if (LOG) trace("Binding to UDP " ~ ctxt.local.toString()); if (!setOption(fd, TCPOption.REUSEADDR, true)) { closesocket(fd); @@ -2689,20 +2690,21 @@ nothrow extern(System) EventLoopImpl* evLoop = &evLoopMain.m_evLoop; scope(exit) { FreeAddrInfoExW(infos); - assumeWontThrow(AsyncDNSRequest.free(overlapped.resolve)); - assumeWontThrow(AsyncOverlapped.free(overlapped)); } if (err != EWIN.WSA_OK) { evLoop.setInternalError!"GetAddrInfoExW"(Status.ABORT, string.init, err); return; } + trace("5"); NetworkAddress* addr = cast(NetworkAddress*)overlapped.resolve.dns.addr; ubyte* pAddr = cast(ubyte*) infos.ai_addr; ubyte* data = cast(ubyte*) addr.sockAddr; data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy addr.family = cast(ushort)infos.ai_family; + trace("6"); overlapped.resolve.dns.cmdInfo().ready.trigger(*evLoopMain); + trace("7"); } void onOverlappedReceiveComplete(error_t error, DWORD recvCount, AsyncOverlapped* overlapped, DWORD flags)