Skip to content

Commit

Permalink
Implemented direct sending of files over TCP connections for the win3…
Browse files Browse the repository at this point in the history
…2 back end. See issue #.
  • Loading branch information
s-ludwig committed Jan 7, 2013
1 parent 8528ed8 commit 057d6af
Showing 1 changed file with 68 additions and 3 deletions.
71 changes: 68 additions & 3 deletions source/vibe/core/drivers/win32.d
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import std.utf;
enum WM_USER_SIGNAL = WM_USER+101;
enum WM_USER_SOCKET = WM_USER+102;

pragma(lib, "wsock32.lib");

/******************************************************************************/
/* class Win32EventDriver */
/******************************************************************************/
Expand All @@ -44,6 +46,8 @@ class Win32EventDriver : EventDriver {
int m_timerIdCounter = 0;
SocketEventHandler[SOCKET] m_socketHandlers;
HANDLE[] m_registeredEvents;
HANDLE m_fileCompletionEvent;
bool[Win32TcpConnection] m_fileWriters;
}

this(DriverCore core)
Expand All @@ -59,6 +63,9 @@ class Win32EventDriver : EventDriver {

WSADATA wd;
enforce(WSAStartup(0x0202, &wd) == 0, "Failed to initialize WinSock");

m_fileCompletionEvent = CreateEventW(null, false, false, null);
m_registeredEvents ~= m_fileCompletionEvent;
}

~this()
Expand Down Expand Up @@ -101,7 +108,15 @@ class Win32EventDriver : EventDriver {

private void waitForEvents(uint timeout)
{
MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr, timeout, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE);
auto ret = MsgWaitForMultipleObjectsEx(cast(DWORD)m_registeredEvents.length, m_registeredEvents.ptr, timeout, QS_ALLEVENTS, MWMO_ALERTABLE|MWMO_INPUTAVAILABLE);
if( ret == WAIT_OBJECT_0 ){
Win32TcpConnection[] to_remove;
foreach( fw; m_fileWriters.byKey )
if( fw.testFileWritten() )
to_remove ~= fw;
foreach( fw; to_remove )
m_fileWriters.remove(fw);
}
}

void exitEventLoop()
Expand Down Expand Up @@ -945,6 +960,9 @@ class Win32TcpConnection : TcpConnection, SocketEventHandler {
DWORD m_bytesTransferred;
ConnectionStatus m_status = ConnectionStatus.Initialized;
FixedRingBuffer!(ubyte, 64*1024) m_readBuffer;

HANDLE m_transferredFile;
OVERLAPPED m_fileOverlapped;
}

this(Win32EventDriver driver, SOCKET sock)
Expand All @@ -954,6 +972,13 @@ class Win32TcpConnection : TcpConnection, SocketEventHandler {
m_task = Task.getThis();
m_driver.m_socketHandlers[sock] = this;

// setup overlapped structure for copy-less file sending
m_fileOverlapped.Internal = 0;
m_fileOverlapped.InternalHigh = 0;
m_fileOverlapped.Offset = 0;
m_fileOverlapped.OffsetHigh = 0;
m_fileOverlapped.hEvent = m_driver.m_fileCompletionEvent;

WSAAsyncSelect(sock, m_driver.m_hwnd, WM_USER_SOCKET, FD_READ|FD_WRITE|FD_CLOSE);
}

Expand Down Expand Up @@ -1121,6 +1146,24 @@ m_status = ConnectionStatus.Connected;

void write(InputStream stream, ulong nbytes = 0, bool do_flush = true)
{
import vibe.core.drivers.threadedfile;
// special case sending of files
if( auto fstream = cast(Win32FileStream)stream ){
if( fstream.tell() == 0 && fstream.size <= 1<<31 ){
logDebug("Using sendfile! %s %s %s", fstream.m_handle, fstream.tell(), fstream.size);

m_bytesTransferred = 0;
m_driver.m_fileWriters[this] = true;
if( TransmitFile(m_socket, fstream.m_handle, 0, 0, &m_fileOverlapped, null, 0) )
m_bytesTransferred = 1;

enforce(WSAGetLastError() == WSA_IO_PENDING, "Failed to send file over TCP.");

while( m_bytesTransferred < fstream.size ) m_driver.m_core.yieldForEvent();
return;
}
}

writeDefault(stream, nbytes, do_flush);
}

Expand All @@ -1129,6 +1172,20 @@ m_status = ConnectionStatus.Connected;
// TODO!
}

bool testFileWritten()
{
if( !GetOverlappedResult(m_transferredFile, &m_fileOverlapped, &m_bytesTransferred, false) ){
if( GetLastError() != ERROR_IO_PENDING ){
m_driver.m_core.resumeTask(m_task, new Exception("File transfer over TCP failed."));
return true;
}
return false;
} else {
m_driver.m_core.resumeTask(m_task);
return true;
}
}

void notifySocketEvent(SOCKET sock, WORD event, WORD error)
nothrow {
try {
Expand Down Expand Up @@ -1334,6 +1391,8 @@ private extern(System) nothrow
alias void function(DWORD, DWORD, OVERLAPPED*) LPOVERLAPPED_COMPLETION_ROUTINE;

DWORD GetCurrentThreadId();

HANDLE CreateEventW(SECURITY_ATTRIBUTES* lpEventAttributes, BOOL bManualReset, BOOL bInitialState, LPCWSTR lpName);
BOOL PostThreadMessageW(DWORD idThread, UINT Msg, WPARAM wParam, LPARAM lParam);
DWORD MsgWaitForMultipleObjectsEx(DWORD nCount, const(HANDLE) *pHandles, DWORD dwMilliseconds, DWORD dwWakeMask, DWORD dwFlags);
BOOL CloseHandle(HANDLE hObject);
Expand All @@ -1348,6 +1407,12 @@ private extern(System) nothrow
LONG DispatchMessageW(MSG *lpMsg);
BOOL PostMessageW(HWND hwnd, UINT msg, WPARAM wPara, LPARAM lParam);
BOOL SetEndOfFile(HANDLE hFile);
BOOL GetOverlappedResult(HANDLE hFile, OVERLAPPED* lpOverlapped, DWORD* lpNumberOfBytesTransferred, BOOL bWait);

enum {
ERROR_ALREADY_EXISTS = 183,
ERROR_IO_PENDING = 997
}

struct FILE_NOTIFY_INFORMATION {
DWORD NextEntryOffset;
Expand Down Expand Up @@ -1478,6 +1543,7 @@ private extern(System) nothrow
alias void function(DWORD, DWORD, WSAOVERLAPPEDX*, DWORD) LPWSAOVERLAPPED_COMPLETION_ROUTINEX;
alias void function(DWORD, DWORD, WSAOVERLAPPEDX*) LPLOOKUPSERVICE_COMPLETION_ROUTINE;
alias void* LPCONDITIONPROC;
alias void* LPTRANSMIT_FILE_BUFFERS;

SOCKET WSAAccept(SOCKET s, sockaddr *addr, INT* addrlen, LPCONDITIONPROC lpfnCondition, DWORD_PTR dwCallbackData);
int WSAAsyncSelect(SOCKET s, HWND hWnd, uint wMsg, sizediff_t lEvent);
Expand All @@ -1492,10 +1558,9 @@ private extern(System) nothrow
int getaddrinfo(LPCSTR pName, LPCSTR pServiceName, const ADDRINFOA *pHints, ADDRINFOA **ppResult);
void FreeAddrInfoW(ADDRINFOEXW* pAddrInfo);
void freeaddrinfo(ADDRINFOA* ai);
BOOL TransmitFile(SOCKET hSocket, HANDLE hFile, DWORD nNumberOfBytesToWrite, DWORD nNumberOfBytesPerSend, OVERLAPPED* lpOverlapped, LPTRANSMIT_FILE_BUFFERS lpTransmitBuffers, DWORD dwFlags);


const uint ERROR_ALREADY_EXISTS = 183;

struct GUID
{
uint Data1;
Expand Down

0 comments on commit 057d6af

Please sign in to comment.