Skip to content

Commit

Permalink
Added new features and bugfixes #1
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaven committed May 2, 2024
2 parents acde6ee + 7eb9e98 commit ea448b4
Show file tree
Hide file tree
Showing 8 changed files with 870 additions and 98 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ ubuntu-latest, windows-latest, macos-latest ]
os: [ ubuntu-22.04, windows-latest, macos-latest ]
std: [ 11, 20 ]

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- name: Configure CMake
# Configure CMake in a 'build' subdirectory. `CMAKE_BUILD_TYPE` is only required if you are using a single-configuration generator such as make.
Expand Down Expand Up @@ -57,6 +57,6 @@ jobs:
run: >
ctest
-C ${{env.BUILD_TYPE}}
--parallel 10
--repeat-until-fail 100
--parallel 20
--repeat-until-fail 200
--output-on-failure
16 changes: 16 additions & 0 deletions include/SimpleSysIO/StreamAcceptorBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#pragma once


#include <functional>
#include <memory>

#include "StreamSocketBase.hpp"
Expand All @@ -20,6 +21,11 @@ namespace SIMPLESYSIO_CUSTOMIZED_NAMESPACE

class StreamAcceptorBase
{
public: // static members:

using AsyncAcceptCallback =
std::function<void(std::unique_ptr<StreamSocketBase>, bool)>;

public:


Expand All @@ -39,6 +45,16 @@ class StreamAcceptorBase
*/
virtual std::unique_ptr<StreamSocketBase> Accept() = 0;


/**
* @brief Accept a new connection asynchronously
*
* @param callback the callback function to be called when a new connection
* is accepted, or an error occurs
*/
virtual void AsyncAccept(AsyncAcceptCallback callback) = 0;


}; // class StreamSocketServerBase

} // namespace SimpleSysIO
274 changes: 273 additions & 1 deletion include/SimpleSysIO/StreamSocketBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
#include <cstddef>
#include <cstdint>

#include <functional>
#include <type_traits>
#include <vector>

#include <SimpleObjects/RealNumCast.hpp>

Expand All @@ -23,12 +25,21 @@ namespace SIMPLESYSIO_CUSTOMIZED_NAMESPACE
#endif
{


struct StreamSocketRaw;


class StreamSocketBase
{
public: //static members:

using EndianType = Internal::Obj::Endian;

using AsyncRecvCallback = std::function<void(std::vector<uint8_t>, bool)>;

friend struct StreamSocketRaw;
friend struct StreamSocketAsync;

public:
StreamSocketBase() = default;

Expand Down Expand Up @@ -249,6 +260,56 @@ class StreamSocketBase
}


/**
* @brief The very basic interface to receive data asynchronously
*
* @param buffSize The size of the intermediate buffer used to store
* received data
* The child class implementation should allocate this
* buffer internally and pass it to the callback function
* @param callback The callback function to be called when the data is
* received
*/
virtual void AsyncRecvRaw(size_t buffSize, AsyncRecvCallback callback) = 0;


virtual void AsyncRecvRawUntilComplete(
size_t expSize,
AsyncRecvCallback callback
)
{
AsyncRecvRawUntilCompleteImpl implCallbackFunctor(
this,
expSize,
std::move(callback),
std::make_shared<std::vector<uint8_t> >()
);
AsyncRecvCallback implCallback = std::move(implCallbackFunctor);

AsyncRecvRaw(expSize, std::move(implCallback));
}

template<
typename _ContainerType,
typename _SizeType = uint64_t,
EndianType _TransmitEndian = EndianType::little
>
void AsyncSizedRecvBytes(
std::function<void(_ContainerType, bool)> callback
)
{
AsyncSizedRecvBytesDataImpl<_ContainerType> dataCallback(
std::move(callback)
);
AsyncSizedRecvBytesSizeImpl<
_ContainerType,
_SizeType,
_TransmitEndian
> sizeCallback(this, std::move(dataCallback));

AsyncRecvRawUntilComplete(sizeof(_SizeType), sizeCallback);
}

protected:


Expand Down Expand Up @@ -325,6 +386,217 @@ class StreamSocketBase
);
}
}
};

private:

struct AsyncRecvRawUntilCompleteImpl
{
AsyncRecvRawUntilCompleteImpl(
StreamSocketBase* socket,
size_t expSize,
AsyncRecvCallback callback,
std::shared_ptr<std::vector<uint8_t> > cached
) :
m_socket(socket),
m_expSize(expSize),
m_callback(std::move(callback)),
m_cached(std::move(cached))
{}

AsyncRecvRawUntilCompleteImpl(AsyncRecvRawUntilCompleteImpl&& other) :
m_socket(std::move(other.m_socket)),
m_expSize(std::move(other.m_expSize)),
m_callback(std::move(other.m_callback)),
m_cached(std::move(other.m_cached))
{}

AsyncRecvRawUntilCompleteImpl(
const AsyncRecvRawUntilCompleteImpl& other
) :
m_socket(other.m_socket),
m_expSize(other.m_expSize),
m_callback(other.m_callback),
m_cached(other.m_cached)
{}

void operator()(std::vector<uint8_t> buf, bool hasErrorOccurred)
{
if (!hasErrorOccurred)
{
m_cached->insert(
m_cached->end(),
std::make_move_iterator(buf.begin()),
std::make_move_iterator(buf.end())
);

if (m_cached->size() < m_expSize)
{
// we need to receive more data

AsyncRecvRawUntilCompleteImpl nextCallbackFunctor(
m_socket,
m_expSize,
m_callback,
m_cached
);
AsyncRecvCallback nextCallback =
std::move(nextCallbackFunctor);

m_socket->AsyncRecvRaw(
m_expSize - m_cached->size(),
std::move(nextCallback)
);
}
else
{
// we have received enough data
m_callback(std::move(*m_cached), false);
}
}
else
{
// error occurred
m_callback(std::vector<uint8_t>(), true);
}
}

StreamSocketBase* m_socket;
size_t m_expSize;
AsyncRecvCallback m_callback;
std::shared_ptr<std::vector<uint8_t> > m_cached;
}; // struct AsyncRecvRawUntilCompleteImpl

template<typename _ContainerType>
struct AsyncSizedRecvBytesDataImpl
{
using CallbackType = std::function<void(_ContainerType, bool)>;
using DestValType = typename _ContainerType::value_type;
static constexpr size_t sk_destValSize = sizeof(DestValType);

AsyncSizedRecvBytesDataImpl(
CallbackType callback
):
m_callback(std::move(callback))
{}

AsyncSizedRecvBytesDataImpl(AsyncSizedRecvBytesDataImpl&& other) :
m_callback(std::move(other.m_callback))
{}

AsyncSizedRecvBytesDataImpl(
const AsyncSizedRecvBytesDataImpl& other
) :
m_callback(other.m_callback)
{}

void operator()(std::vector<uint8_t> buf, bool hasErrorOccurred)
{
if (!hasErrorOccurred)
{
size_t destSize =
(buf.size() + sk_destValSize - 1) / sk_destValSize;

_ContainerType data;
data.resize(destSize);

std::memcpy(&(data[0]), buf.data(), buf.size());

m_callback(data, hasErrorOccurred);
}
else
{
// error occurred or socket has been closed
m_callback(_ContainerType(), true);
}
}

CallbackType m_callback;
}; // struct AsyncSizedRecvBytesDataImpl

template<
typename _ContainerType,
typename _SizeType = uint64_t,
EndianType _TransmitEndian = EndianType::little
>
struct AsyncSizedRecvBytesSizeImpl
{
using DataCallbackType = AsyncSizedRecvBytesDataImpl<_ContainerType>;

AsyncSizedRecvBytesSizeImpl(
StreamSocketBase* socket,
DataCallbackType dataCallback
):
m_socket(socket),
m_dataCallback(std::move(dataCallback))
{}

AsyncSizedRecvBytesSizeImpl(AsyncSizedRecvBytesSizeImpl&& other) :
m_socket(std::move(other.m_socket)),
m_dataCallback(std::move(other.m_dataCallback))
{}

AsyncSizedRecvBytesSizeImpl(
const AsyncSizedRecvBytesSizeImpl& other
) :
m_socket(other.m_socket),
m_dataCallback(other.m_dataCallback)
{}

void operator()(std::vector<uint8_t> buf, bool hasErrorOccurred)
{
if (!hasErrorOccurred)
{
_SizeType size = 0;
std::memcpy(&size, buf.data(), sizeof(_SizeType));

// Convert endianness from transmit --> native
size = Internal::EndianConvert<
_TransmitEndian,
EndianType::native
>::Primitive(size);

m_socket->AsyncRecvRawUntilComplete(
size,
m_dataCallback
);
}
else
{
// error occurred or socket has been closed
m_dataCallback(std::vector<uint8_t>(), true);
}
}

StreamSocketBase* m_socket;
DataCallbackType m_dataCallback;
}; // struct AsyncSizedRecvBytesSizeImpl

}; // class StreamSocketBase


struct StreamSocketRaw
{

static size_t Send(StreamSocketBase& sock, const void* data, size_t size)
{
return sock.SendRaw(data, size);
}

static size_t Recv(StreamSocketBase& sock, void* buf, size_t size)
{
return sock.RecvRaw(buf, size);
}

static void AsyncRecv(
StreamSocketBase& sock,
size_t buffSize,
typename StreamSocketBase::AsyncRecvCallback callback
)
{
sock.AsyncRecvRaw(buffSize, std::move(callback));
}

}; // struct StreamSocketRaw


} // namespace SimpleSysIO
Loading

0 comments on commit ea448b4

Please sign in to comment.