From 057d6af92db03990bdb1dcefc2819ca9180a922b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 7 Jan 2013 18:52:58 +0100 Subject: [PATCH] Implemented direct sending of files over TCP connections for the win32 back end. See issue #. --- source/vibe/core/drivers/win32.d | 71 ++++++++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 3 deletions(-) diff --git a/source/vibe/core/drivers/win32.d b/source/vibe/core/drivers/win32.d index 8d20ba6117..8388972967 100644 --- a/source/vibe/core/drivers/win32.d +++ b/source/vibe/core/drivers/win32.d @@ -31,6 +31,8 @@ import std.utf; enum WM_USER_SIGNAL = WM_USER+101; enum WM_USER_SOCKET = WM_USER+102; +pragma(lib, "wsock32.lib"); + /******************************************************************************/ /* class Win32EventDriver */ /******************************************************************************/ @@ -44,6 +46,8 @@ class Win32EventDriver : EventDriver { int m_timerIdCounter = 0; SocketEventHandler[SOCKET] m_socketHandlers; HANDLE[] m_registeredEvents; + HANDLE m_fileCompletionEvent; + bool[Win32TcpConnection] m_fileWriters; } this(DriverCore core) @@ -59,6 +63,9 @@ class Win32EventDriver : EventDriver { WSADATA wd; enforce(WSAStartup(0x0202, &wd) == 0, "Failed to initialize WinSock"); + + m_fileCompletionEvent = CreateEventW(null, false, false, null); + m_registeredEvents ~= m_fileCompletionEvent; } ~this() @@ -101,7 +108,15 @@ class Win32EventDriver : EventDriver { private void waitForEvents(uint timeout) { - MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr, timeout, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE); + auto ret = MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr, timeout, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE); + if( ret == WAIT_OBJECT_0 ){ + Win32TcpConnection[] to_remove; + foreach( fw; m_fileWriters.byKey ) + if( fw.testFileWritten() ) + to_remove ~= fw; + foreach( fw; to_remove ) + m_fileWriters.remove(fw); + } } void exitEventLoop() @@ -945,6 +960,9 @@ class Win32TcpConnection : TcpConnection, SocketEventHandler { DWORD m_bytesTransferred; ConnectionStatus m_status = ConnectionStatus.Initialized; FixedRingBuffer!(ubyte, 64*1024) m_readBuffer; + + HANDLE m_transferredFile; + OVERLAPPED m_fileOverlapped; } this(Win32EventDriver driver, SOCKET sock) @@ -954,6 +972,13 @@ class Win32TcpConnection : TcpConnection, SocketEventHandler { m_task = Task.getThis(); m_driver.m_socketHandlers[sock] = this; + // setup overlapped structure for copy-less file sending + m_fileOverlapped.Internal = 0; + m_fileOverlapped.InternalHigh = 0; + m_fileOverlapped.Offset = 0; + m_fileOverlapped.OffsetHigh = 0; + m_fileOverlapped.hEvent = m_driver.m_fileCompletionEvent; + WSAAsyncSelect(sock, m_driver.m_hwnd, WM_USER_SOCKET, FD_READ|FD_WRITE|FD_CLOSE); } @@ -1121,6 +1146,24 @@ m_status = ConnectionStatus.Connected; void write(InputStream stream, ulong nbytes = 0, bool do_flush = true) { + import vibe.core.drivers.threadedfile; + // special case sending of files + if( auto fstream = cast(Win32FileStream)stream ){ + if( fstream.tell() == 0 && fstream.size <= 1<<31 ){ + logDebug("Using sendfile! %s %s %s", fstream.m_handle, fstream.tell(), fstream.size); + + m_bytesTransferred = 0; + m_driver.m_fileWriters[this] = true; + if( TransmitFile(m_socket, fstream.m_handle, 0, 0, &m_fileOverlapped, null, 0) ) + m_bytesTransferred = 1; + + enforce(WSAGetLastError() == WSA_IO_PENDING, "Failed to send file over TCP."); + + while( m_bytesTransferred < fstream.size ) m_driver.m_core.yieldForEvent(); + return; + } + } + writeDefault(stream, nbytes, do_flush); } @@ -1129,6 +1172,20 @@ m_status = ConnectionStatus.Connected; // TODO! } + bool testFileWritten() + { + if( !GetOverlappedResult(m_transferredFile, &m_fileOverlapped, &m_bytesTransferred, false) ){ + if( GetLastError() != ERROR_IO_PENDING ){ + m_driver.m_core.resumeTask(m_task, new Exception("File transfer over TCP failed.")); + return true; + } + return false; + } else { + m_driver.m_core.resumeTask(m_task); + return true; + } + } + void notifySocketEvent(SOCKET sock, WORD event, WORD error) nothrow { try { @@ -1334,6 +1391,8 @@ private extern(System) nothrow alias void function(DWORD, DWORD, OVERLAPPED*) LPOVERLAPPED_COMPLETION_ROUTINE; DWORD GetCurrentThreadId(); + + HANDLE CreateEventW(SECURITY_ATTRIBUTES* lpEventAttributes, BOOL bManualReset, BOOL bInitialState, LPCWSTR lpName); BOOL PostThreadMessageW(DWORD idThread, UINT Msg, WPARAM wParam, LPARAM lParam); DWORD MsgWaitForMultipleObjectsEx(DWORD nCount, const(HANDLE) *pHandles, DWORD dwMilliseconds, DWORD dwWakeMask, DWORD dwFlags); BOOL CloseHandle(HANDLE hObject); @@ -1348,6 +1407,12 @@ private extern(System) nothrow LONG DispatchMessageW(MSG *lpMsg); BOOL PostMessageW(HWND hwnd, UINT msg, WPARAM wPara, LPARAM lParam); BOOL SetEndOfFile(HANDLE hFile); + BOOL GetOverlappedResult(HANDLE hFile, OVERLAPPED* lpOverlapped, DWORD* lpNumberOfBytesTransferred, BOOL bWait); + + enum { + ERROR_ALREADY_EXISTS = 183, + ERROR_IO_PENDING = 997 + } struct FILE_NOTIFY_INFORMATION { DWORD NextEntryOffset; @@ -1478,6 +1543,7 @@ private extern(System) nothrow alias void function(DWORD, DWORD, WSAOVERLAPPEDX*, DWORD) LPWSAOVERLAPPED_COMPLETION_ROUTINEX; alias void function(DWORD, DWORD, WSAOVERLAPPEDX*) LPLOOKUPSERVICE_COMPLETION_ROUTINE; alias void* LPCONDITIONPROC; + alias void* LPTRANSMIT_FILE_BUFFERS; SOCKET WSAAccept(SOCKET s, sockaddr *addr, INT* addrlen, LPCONDITIONPROC lpfnCondition, DWORD_PTR dwCallbackData); int WSAAsyncSelect(SOCKET s, HWND hWnd, uint wMsg, sizediff_t lEvent); @@ -1492,10 +1558,9 @@ private extern(System) nothrow int getaddrinfo(LPCSTR pName, LPCSTR pServiceName, const ADDRINFOA *pHints, ADDRINFOA **ppResult); void FreeAddrInfoW(ADDRINFOEXW* pAddrInfo); void freeaddrinfo(ADDRINFOA* ai); + BOOL TransmitFile(SOCKET hSocket, HANDLE hFile, DWORD nNumberOfBytesToWrite, DWORD nNumberOfBytesPerSend, OVERLAPPED* lpOverlapped, LPTRANSMIT_FILE_BUFFERS lpTransmitBuffers, DWORD dwFlags); - const uint ERROR_ALREADY_EXISTS = 183; - struct GUID { uint Data1;