Skip to content

Commit

Permalink
merging latest changes from master to branch-1.6
Browse files Browse the repository at this point in the history
  • Loading branch information
skaarthik authored Aug 23, 2016
2 parents ea63acf + 1215ae6 commit 3364663
Show file tree
Hide file tree
Showing 53 changed files with 508 additions and 206 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<img src='logo/mobius-star-200.png' width='125px' alt='Mobius logo' />
# Mobius: C# API for Spark

[Mobius](https://github.com/Microsoft/Mobius) provides C# language binding to [Apache Spark](https://spark.apache.org/), enabling the implementation of Spark driver code and data processing operations in the languages supported in the .NET framework like C# or F#.
[Mobius](https://github.com/Microsoft/Mobius) provides C# language binding to [Apache Spark](https://spark.apache.org/) enabling the implementation of Spark driver program and data processing operations in the languages supported in the .NET framework like C# or F#.

For example, the word count sample in Apache Spark can be implemented in C# as follows :

Expand Down Expand Up @@ -89,7 +89,7 @@ Refer to [Mobius C# API documentation](./csharp/Adapter/documentation/Mobius_API

Mobius API usage samples are available at:

* [Examples folder](./examples) which contains standalone [C#/F# projects](./notes/running-mobius-app.md#running-mobius-examples-in-local-mode) that can be used as templates to start developing Mobius applications
* [Examples folder](./examples) which contains standalone [C# and F# projects](./notes/running-mobius-app.md#running-mobius-examples-in-local-mode) that can be used as templates to start developing Mobius applications

* [Samples project](./csharp/Samples/Microsoft.Spark.CSharp/) which uses a comprehensive set of Mobius APIs to implement samples that are also used for functional validation of APIs

Expand Down
122 changes: 88 additions & 34 deletions cpp/Riosock/Riosock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ DWORD CQUsed = 0;

LONG RIOSockRef;
PrioritizedLock *CQAccessLock = nullptr;
RIO_CQ CompletionQueue = nullptr;
RIO_RQ RequestQueue = nullptr;
RIO_NOTIFICATION_COMPLETION CompletionType;
RIO_CQ RecvCQ = nullptr;
RIO_CQ SendCQ = nullptr;
RIO_NOTIFICATION_COMPLETION RecvCompletionType;
RIO_NOTIFICATION_COMPLETION SendCompletionType;
RIO_EXTENSION_FUNCTION_TABLE RIOFuncs = { 0 };

//
Expand Down Expand Up @@ -268,40 +269,73 @@ HRESULT RIOSOCKAPI RIOSockInitialize()
return E_OUTOFMEMORY;
}

// Create IOCP handle
auto iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0);
if (iocpHandle == nullptr)
// Create IOCP handle for RECV CQ
auto iocpHandleOfRecv = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0);
if (iocpHandleOfRecv == nullptr)
{
DWORD errorCode = GetLastError();
delete CQAccessLock;
SetLastError(errorCode);
return HRESULT_FROM_WIN32(errorCode);
}

// Create IOCP handle for SEND CQ
auto iocpHandleOfSend = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0);
if (iocpHandleOfSend == nullptr)
{
DWORD errorCode = GetLastError();
// Close IOCP handle
CloseHandle(iocpHandleOfRecv);
delete CQAccessLock;
SetLastError(errorCode);
return HRESULT_FROM_WIN32(errorCode);
}

// Create OVERLAPPED
auto overLapped = calloc(1, sizeof(OVERLAPPED));
if (nullptr == overLapped) {
// Close IOCP handle
CloseHandle(iocpHandle);
CloseHandle(iocpHandleOfRecv);
CloseHandle(iocpHandleOfSend);
delete CQAccessLock;
SetLastError(WSAENOBUFS);
return HRESULT_FROM_WIN32(WSAENOBUFS);
}

// With RIO, we don't associate the IOCP handle with the socket like 'typical' sockets
// - Instead we directly pass the IOCP handle through RIOCreateCompletionQueue
::ZeroMemory(&CompletionType, sizeof(CompletionType));

CompletionType.Type = RIO_IOCP_COMPLETION;
CompletionType.Iocp.CompletionKey = reinterpret_cast<void*>(1);
CompletionType.Iocp.Overlapped = overLapped;
CompletionType.Iocp.IocpHandle = iocpHandle;
::ZeroMemory(&RecvCompletionType, sizeof(RecvCompletionType));
RecvCompletionType.Type = RIO_IOCP_COMPLETION;
RecvCompletionType.Iocp.CompletionKey = reinterpret_cast<void*>(1);
RecvCompletionType.Iocp.Overlapped = overLapped;
RecvCompletionType.Iocp.IocpHandle = iocpHandleOfRecv;

::ZeroMemory(&SendCompletionType, sizeof(SendCompletionType));
SendCompletionType.Type = RIO_IOCP_COMPLETION;
SendCompletionType.Iocp.CompletionKey = reinterpret_cast<void*>(1);
SendCompletionType.Iocp.Overlapped = overLapped;
SendCompletionType.Iocp.IocpHandle = iocpHandleOfSend;

// Create a completion queue
CompletionQueue = CreateRIOCompletionQueue(DefaultRIOCQSize, &CompletionType);
if (RIO_INVALID_CQ == CompletionQueue) {
// Create a completion queue for RECV
RecvCQ = CreateRIOCompletionQueue(DefaultRIOCQSize, &RecvCompletionType);
if (RIO_INVALID_CQ == RecvCQ) {
DWORD errorCode = WSAGetLastError();
CloseHandle(iocpHandleOfRecv);
CloseHandle(iocpHandleOfSend);
free(overLapped);
delete CQAccessLock;
SetLastError(errorCode);
return HRESULT_FROM_WIN32(errorCode);
}

// Create a completion queue for SEND
SendCQ = CreateRIOCompletionQueue(DefaultRIOCQSize, &SendCompletionType);
if (RIO_INVALID_CQ == SendCQ) {
DWORD errorCode = WSAGetLastError();
CloseHandle(iocpHandle);
CloseRIOCompletionQueue(RecvCQ);
RecvCQ = RIO_INVALID_CQ;
CloseHandle(iocpHandleOfRecv);
CloseHandle(iocpHandleOfSend);
free(overLapped);
delete CQAccessLock;
SetLastError(errorCode);
Expand Down Expand Up @@ -331,18 +365,29 @@ void RIOSOCKAPI RIOSockUninitialize()
InterlockedDecrement(&RIOSockRef);
if (RIOSockRef > 0) return;

if (CompletionQueue != RIO_INVALID_CQ) {
CloseRIOCompletionQueue(CompletionQueue);
CompletionQueue = RIO_INVALID_CQ;
if (RecvCQ != RIO_INVALID_CQ) {
CloseRIOCompletionQueue(RecvCQ);
RecvCQ = RIO_INVALID_CQ;
}

if (SendCQ != RIO_INVALID_CQ) {
CloseRIOCompletionQueue(SendCQ);
SendCQ = RIO_INVALID_CQ;
}

if (CompletionType.Iocp.IocpHandle != nullptr) {
CloseHandle(CompletionType.Iocp.IocpHandle);
CompletionType.Iocp.IocpHandle = nullptr;
if (RecvCompletionType.Iocp.IocpHandle != nullptr) {
CloseHandle(RecvCompletionType.Iocp.IocpHandle);
RecvCompletionType.Iocp.IocpHandle = nullptr;
}

free(CompletionType.Iocp.Overlapped);
CompletionType.Iocp.Overlapped = nullptr;
if (SendCompletionType.Iocp.IocpHandle != nullptr) {
CloseHandle(SendCompletionType.Iocp.IocpHandle);
SendCompletionType.Iocp.IocpHandle = nullptr;
}

free(RecvCompletionType.Iocp.Overlapped);
RecvCompletionType.Iocp.Overlapped = nullptr;
SendCompletionType.Iocp.Overlapped = nullptr;

delete CQAccessLock;
CQAccessLock = nullptr;
Expand Down Expand Up @@ -513,19 +558,22 @@ BOOL RIOSOCKAPI PostRIOSend(
// Description:
// This function registers the method to use for notification behavior.
//
// Parameters:
// isRecvCq - Indicates whether register Notify at RecvCQ or SendCQ.
//
// Result:
// Returns TRUE if no error occurs. Otherwise, a value of FALSE is returned.
//-
FORCEINLINE
BOOL RIOSOCKAPI RegisterRIONotify()
BOOL RIOSOCKAPI RegisterRIONotify(_In_ BOOL isRecvCq)
{
auto hr = EnsureWinSockMethods(INVALID_SOCKET);
if (FAILED(hr))
{
return FALSE;
}

auto notify = RIOFuncs.RIONotify(CompletionQueue);
auto notify = RIOFuncs.RIONotify((isRecvCq == TRUE) ? RecvCQ : SendCQ);
if (notify != ERROR_SUCCESS) {
SetLastError(notify);
return FALSE;
Expand Down Expand Up @@ -633,7 +681,8 @@ BOOL RIOSOCKAPI AllocateRIOCompletion(
newCQSize = RIO_MAX_CQ_SIZE;
}

if (!ResizeRIOCompletionQueue(CompletionQueue, newCQSize))
if (!ResizeRIOCompletionQueue(RecvCQ, newCQSize) &&
!ResizeRIOCompletionQueue(SendCQ, newCQSize))
{
return FALSE;
}
Expand Down Expand Up @@ -684,6 +733,7 @@ BOOL RIOSOCKAPI ReleaseRIOCompletion(
// It will always post a Notify with proper synchronization.
//
// Parameters:
// isRecvCq - Indicates whether dequeue results from RecvCQ or SendCQ
// rioResults - An array of RIORESULT structures to receive the description of the completions dequeued.
// rioResultSize - The maximum number of entries in the rioResults to write.
//
Expand All @@ -694,6 +744,7 @@ BOOL RIOSOCKAPI ReleaseRIOCompletion(
//-
FORCEINLINE
DWORD RIOSOCKAPI DequeueRIOResults(
_In_ BOOL isRecvCq,
_Out_ PRIORESULT rioResults,
_In_ DWORD rioResultSize
)
Expand All @@ -702,7 +753,7 @@ DWORD RIOSOCKAPI DequeueRIOResults(
// dequeuing. So it can add space to the CQ
AutoReleaseDefaultLock defaultLock(*CQAccessLock);

auto resultCount = DequeueRIOCompletion(CompletionQueue, rioResults, rioResultSize);
auto resultCount = DequeueRIOCompletion((isRecvCq == TRUE) ? RecvCQ : SendCQ, rioResults, rioResultSize);
if (0 == resultCount || RIO_CORRUPT_CQ == resultCount)
{
// We were notified there were completions, but we can't dequeue any IO
Expand All @@ -711,7 +762,7 @@ DWORD RIOSOCKAPI DequeueRIOResults(
}

// Immediately after invoking Dequeue, post another Notify
auto notifyResult = RegisterRIONotify();
auto notifyResult = RegisterRIONotify(isRecvCq);
if (notifyResult == FALSE)
{
// if notify fails, we can't reliably know when the next IO completes
Expand Down Expand Up @@ -762,8 +813,8 @@ RIO_RQ RIOSOCKAPI CreateRIORequestQueue(
1,
maxOutstandingSend,
1,
CompletionQueue,
CompletionQueue,
RecvCQ,
SendCQ,
socketContext
);
}
Expand Down Expand Up @@ -811,19 +862,22 @@ BOOL RIOSOCKAPI ResizeRIORequestQueue(
// This function calls GetQueuedCompletionStatus() internally to dequeue an IO completion packet.
// If there is no completion packet queued, the function blocks the thread.
//
// Parameters:
// isRecvCq - Indicates whether get the status from the RecvCQ or SendCQ
//
// Result:
// If no error occurs, it returns a new request queue. Otherwise, a value of RIO_INVALID_RQ is returned.
//-
FORCEINLINE
BOOL RIOSOCKAPI GetRIOCompletionStatus()
BOOL RIOSOCKAPI GetRIOCompletionStatus(_In_ BOOL isRecvCq)
{
DWORD bytesTransferred;
ULONG_PTR completionKey;
OVERLAPPED *pov = nullptr;


if (!GetQueuedCompletionStatus(
CompletionType.Iocp.IocpHandle,
(isRecvCq == TRUE) ? RecvCompletionType.Iocp.IocpHandle : SendCompletionType.Iocp.IocpHandle,
&bytesTransferred,
&completionKey,
&pov,
Expand Down
23 changes: 14 additions & 9 deletions cpp/Riosock/Riosock.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ extern "C" {
);

// Registers the method to use for notification behavior.
BOOL RIOSOCKAPI RegisterRIONotify();
BOOL RIOSOCKAPI RegisterRIONotify(
_In_ BOOL isRecvCq
);

// Registers a specified buffer for use with RIO Socket
RIO_BUFFERID RIOSOCKAPI RegisterRIOBuffer(
Expand All @@ -77,27 +79,30 @@ extern "C" {

// Dequeues RIO results from the I/O completion queue used with RIO socket.
DWORD RIOSOCKAPI DequeueRIOResults(
_In_ BOOL isRecvCq,
_Out_ PRIORESULT rioResults,
_In_ DWORD rioResultSize
);

// Creates a request queue
RIO_RQ RIOSOCKAPI CreateRIORequestQueue(
_In_ SOCKET socket,
_In_ ULONG maxOutstandingReceive,
_In_ ULONG maxOutstandingSend,
_In_ PVOID socketContext
_In_ SOCKET socket,
_In_ ULONG maxOutstandingReceive,
_In_ ULONG maxOutstandingSend,
_In_ PVOID socketContext
);

// Resizes a request queue
BOOL RIOSOCKAPI ResizeRIORequestQueue(
_In_ RIO_RQ rq,
_In_ DWORD maxOutstandingReceive,
_In_ DWORD maxOutstandingSend
_In_ RIO_RQ rq,
_In_ DWORD maxOutstandingReceive,
_In_ DWORD maxOutstandingSend
);

// Dequeues an IO completion packet
BOOL RIOSOCKAPI GetRIOCompletionStatus();
BOOL RIOSOCKAPI GetRIOCompletionStatus(
_In_ BOOL isRecvCq
);

#ifdef __cplusplus
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ internal class ConfigurationService : IConfigurationService
public const string CSharpSocketTypeEnvName = "spark.mobius.CSharp.socketType";
public const string CSharpWorkerReadBufferSizeEnvName = "spark.mobius.CSharpWorker.readBufferSize";
public const string CSharpWorkerWriteBufferSizeEnvName = "spark.mobius.CSharpWorker.writeBufferSize";
public const string ExecutorCoresEnvName = "spark.executor.cores";
public const string SPARKCLR_HOME = "SPARKCLR_HOME";
public const string SPARK_MASTER = "spark.master";
public const string CSHARPBACKEND_PORT = "CSHARPBACKEND_PORT";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ private object CallJavaMethod(bool isStatic, object classNameOrJvmObjectReferenc
using (var s = socket.GetStream())
{
SerDe.Write(s, overallPayload);
s.Flush();

var isMethodCallFailed = SerDe.ReadInt(s);
//TODO - add boolean instead of int in the backend
Expand Down
3 changes: 3 additions & 0 deletions csharp/Adapter/Microsoft.Spark.CSharp/Network/ByteBufChunk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Runtime.InteropServices;
using System.Security;
using System.Text;
using Microsoft.Spark.CSharp.Services;

namespace Microsoft.Spark.CSharp.Network
{
Expand All @@ -18,6 +19,7 @@ namespace Microsoft.Spark.CSharp.Network
/// </summary>
internal sealed class ByteBufChunk
{
private readonly ILoggerService logger = LoggerServiceFactory.GetLogger(typeof(RioSocketWrapper));
private readonly Queue<Segment> segmentQueue;
private readonly int segmentSize;
private bool disposed;
Expand Down Expand Up @@ -286,6 +288,7 @@ private void Dispose(bool disposing)
return;
}

logger.LogDebug("Disposing ByteBufChunk [{0}].", ToString());
if (!IsUnsafe && memory != null)
{
memory = null;
Expand Down
Loading

0 comments on commit 3364663

Please sign in to comment.