diff --git a/cmake/Install.cmake b/cmake/Install.cmake index a28548742a..9e153bd632 100644 --- a/cmake/Install.cmake +++ b/cmake/Install.cmake @@ -23,6 +23,7 @@ install( FILES_MATCHING PATTERN "*.h" PATTERN "*.hpp" + PATTERN "${PROJECT_SOURCE_DIR}/sese/internal" EXCLUDE PATTERN "${PROJECT_SOURCE_DIR}/sese/test" EXCLUDE PATTERN "${PROJECT_SOURCE_DIR}/sese/example" EXCLUDE ) diff --git a/sese/CMakeLists.txt b/sese/CMakeLists.txt index 33362a259b..17cf9157a0 100644 --- a/sese/CMakeLists.txt +++ b/sese/CMakeLists.txt @@ -155,7 +155,7 @@ endif() # Windows platform configuration # ###################################################################################################################### if(${CMAKE_SYSTEM_NAME} MATCHES "Windows") - file(GLOB_RECURSE NATIVE_WIN_SRC "internal/win/*.cpp" "native/win/*.cpp" "native/win/*.h") + file(GLOB_RECURSE NATIVE_WIN_SRC "internal/win/*.cpp" "internal/win/*.h") target_sources(Core PRIVATE ${NATIVE_WIN_SRC}) target_compile_definitions(Core PRIVATE -D_WIN32_WINNT=0x0601) @@ -182,8 +182,7 @@ if(${CMAKE_SYSTEM_NAME} MATCHES "Linux") target_link_libraries(Core PRIVATE ${CMAKE_THREAD_LIBS_INIT}) target_link_libraries(Core PRIVATE ${CMAKE_DL_LIBS}) - file(GLOB_RECURSE NATIVE_LINUX_SRC "internal/linux/*.cpp" "internal/linux/*.h" "native/linux/*.cpp" - "native/linux/*.h" + file(GLOB_RECURSE NATIVE_LINUX_SRC "internal/linux/*.cpp" "internal/linux/*.h" ) target_sources(Core PRIVATE ${NATIVE_LINUX_SRC}) @@ -197,8 +196,7 @@ endif() if(${CMAKE_SYSTEM_NAME} MATCHES "Darwin") target_link_libraries(Core PRIVATE ${COREFOUNDATION_FRAMEWORK} ${IOKIT_FRAMEWORK} ${CORESERVICES_FRAMEWORK}) - file(GLOB_RECURSE NATIVE_DARWIN_SRC "internal/darwin/*.cpp" "internal/darwin/*.h" "native/darwin/*.cpp" - "native/darwin/*.h" + file(GLOB_RECURSE NATIVE_DARWIN_SRC "internal/darwin/*.cpp" "internal/darwin/*.h" ) target_sources(Core PRIVATE ${NATIVE_DARWIN_SRC}) endif() diff --git a/sese/example/CMakeLists.txt b/sese/example/CMakeLists.txt index 046868cde8..0714a3f22a 100644 --- a/sese/example/CMakeLists.txt +++ b/sese/example/CMakeLists.txt @@ -7,10 +7,6 @@ include(${PROJECT_SOURCE_DIR}/cmake/Manifest.cmake) add_definitions("-DPROJECT_PATH=\"${PROJECT_SOURCE_DIR}\"") add_definitions("-DPROJECT_BINARY_PATH=\"${CMAKE_CURRENT_BINARY_DIR}\"") -add_executable(EchoServer EchoServer.cpp) -target_link_libraries(EchoServer PRIVATE Core) -target_manifest(EchoServer manifest.json) - add_executable(HttpServer HttpServer.cpp) target_link_libraries(HttpServer PRIVATE Core) target_manifest(HttpServer manifest.json) diff --git a/sese/example/EchoServer.cpp b/sese/example/EchoServer.cpp deleted file mode 100644 index 2b11277e30..0000000000 --- a/sese/example/EchoServer.cpp +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include - -class MyIOCPServer : public sese::iocp::IOCPServer { -public: - MyIOCPServer() { - setDeleteContextCallback(myDeleter); - } - - void onAcceptCompleted(sese::iocp::Context *ctx) override { - SESE_INFO("onAcceptCompleted {}", ctx->getFd()); - postRead(ctx); - setTimeout(ctx, 10); - } - - void onPreRead(sese::iocp::Context *ctx) override { - cancelTimeout(ctx); - SESE_INFO("onPreRead {}", ctx->getFd()); - } - - void onReadCompleted(sese::iocp::Context *ctx) override { - SESE_INFO("onReadCompleted {}", ctx->getFd()); - sese::streamMove(ctx, ctx, IOCP_WSABUF_SIZE); - postWrite(ctx); - } - - void onWriteCompleted(sese::iocp::Context *ctx) override { - SESE_INFO("onWriteCompleted {}", ctx->getFd()); - postRead(ctx); - setTimeout(ctx, 10); - } - - void onTimeout(sese::iocp::Context *ctx) override { - SESE_INFO("onTimeout {}", ctx->getFd()); - postClose(ctx); - } - - static void myDeleter(sese::iocp::Context *ctx) { - SESE_INFO("onDeleteCallback {}", ctx->getFd()); - } -}; - -MyIOCPServer server; - -int main(int argc, char **argv) { - sese::initCore(argc, argv); - auto address = sese::net::IPv4Address::any(8080); - server.setThreads(2); - server.setAddress(address); - auto rt = server.init(); - if (!rt) { - SESE_ERROR("server init failed!"); - return 0; - } - SESE_INFO("server listening on {}", address->getPort()); - getchar(); - server.shutdown(); - return 0; -} \ No newline at end of file diff --git a/sese/internal/darwin/net/event/KqueueEvent.h b/sese/internal/darwin/net/event/KqueueEvent.h deleted file mode 100644 index 546f6bca5d..0000000000 --- a/sese/internal/darwin/net/event/KqueueEvent.h +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file KqueueEvent.h - * @brief kqueue event - * @author kaoru - */ - -#pragma once - -#include "sese/net/event/BaseEvent.h" - -namespace sese::event { -/// kqueue event -class KqueueEvent : public BaseEvent { -}; -} // namespace sese::event diff --git a/sese/internal/darwin/net/event/KqueueEventLoop.cpp b/sese/internal/darwin/net/event/KqueueEventLoop.cpp deleted file mode 100644 index 6ce994fe10..0000000000 --- a/sese/internal/darwin/net/event/KqueueEventLoop.cpp +++ /dev/null @@ -1,181 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -#include -#include -#include - -#include - -#define MAX_EVENT_SIZE 64 - -bool sese::event::KqueueEventLoop::init() { - kqueue = ::kqueue(); - if (-1 == kqueue) return false; - if (0 >= listenFd) return true; - - this->listenEvent = new KqueueEvent; - this->listenEvent->fd = listenFd; - this->listenEvent->events = EVENT_NULL; - this->listenEvent->data = nullptr; - - struct kevent kevent {}; - EV_SET(&kevent, listenFd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, this->listenEvent); - if (-1 == ::kevent(kqueue, &kevent, 1, nullptr, 0, nullptr)) { - delete this->listenEvent; - this->listenEvent = nullptr; - - close(this->kqueue); - this->kqueue = -1; - - return false; - } - return true; -} - -sese::event::KqueueEventLoop::~KqueueEventLoop() { - if (-1 != kqueue) { - close(kqueue); - kqueue = -1; - } - - if (listenEvent) { - delete listenEvent; - listenEvent = nullptr; - } -} - -void sese::event::KqueueEventLoop::dispatch(uint32_t time) { - struct kevent events[MAX_EVENT_SIZE]{}; - auto div = ldiv(time, 1000); - struct timespec timeout { - div.quot, div.rem * 1000000 - }; - - auto number_of_fds = ::kevent(kqueue, nullptr, 0, events, MAX_EVENT_SIZE, &timeout); - if (-1 == number_of_fds) return; - for (int i = 0; i < number_of_fds; ++i) { - auto event = reinterpret_cast(events[i].udata); - if (events[i].ident == listenFd) { - auto client = accept(listenFd, nullptr, nullptr); - if (client == -1) continue; - onAccept(client); - continue; - } else { - if (events[i].flags & EV_ERROR && event->events & EVENT_ERROR) { - onError(event); - continue; - } - if (events[i].filter == EVFILT_READ) { - if (event->events & EVENT_READ) { - onRead(event); - } - - if ((events[i].flags & EV_EOF) && handleClose) { - onClose(event); - continue; - } - } else if (events[i].filter == EVFILT_WRITE) { - if ((events[i].flags & EV_EOF) && handleClose) { - onClose(event); - continue; - } - - if (event->events & EVENT_WRITE) { - onWrite(reinterpret_cast(events[i].udata)); - } - } - } - } -} - -void sese::event::KqueueEventLoop::onAccept(int fd) { -} - -void sese::event::KqueueEventLoop::onRead(sese::event::BaseEvent *event) { -} - -void sese::event::KqueueEventLoop::onWrite(sese::event::BaseEvent *event) { -} - -void sese::event::KqueueEventLoop::onError(sese::event::BaseEvent *event) { -} - -void sese::event::KqueueEventLoop::onClose(sese::event::BaseEvent *event) { -} - -// bool sese::event::KqueueEventLoop::addNativeEvent(int fd, uint32_t ev, void *data) const { -// struct kevent kevent {}; -// EV_SET(&kevent, fd, ev, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, data); -// return 0 == ::kevent(kqueue, &kevent, 1, nullptr, 0, nullptr); -// } -// -// bool sese::event::KqueueEventLoop::delNativeEvent(int fd, uint32_t ev, void *data) const { -// struct kevent kevent {}; -// EV_SET(&kevent, fd, ev, EV_DELETE, 0, 0, data); -// return 0 == ::kevent(kqueue, &kevent, 1, nullptr, 0, nullptr); -// } - -sese::event::BaseEvent *sese::event::KqueueEventLoop::createEvent(int fd, unsigned int events, void *data) { - auto event = new KqueueEvent; - event->fd = fd; - event->events = events; - // event->oldEvents = events; - event->data = data; - - struct kevent kevent {}; - if (events & EVENT_READ) { - EV_SET(&kevent, event->fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, event); - ::kevent(kqueue, &kevent, 1, nullptr, 0, nullptr); - } - if (events & EVENT_WRITE) { - EV_SET(&kevent, event->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, event); - ::kevent(kqueue, &kevent, 1, nullptr, 0, nullptr); - } - - return event; -} - -void sese::event::KqueueEventLoop::freeEvent(sese::event::BaseEvent *event) { - // auto fd = event->fd; - // auto events = event->events; - // if (events & EVENT_READ) { - // delNativeEvent(fd, EVFILT_READ, event); - // } - // delNativeEvent(fd, EVFILT_READ, event); - // if (events & EVENT_WRITE) { - // delNativeEvent(fd, EVFILT_WRITE, event); - // } - - delete event; -} - -bool sese::event::KqueueEventLoop::setEvent(sese::event::BaseEvent *event) { - auto e = reinterpret_cast(event); - auto events = e->events; - - struct kevent kevent {}; - if (events & EVENT_READ) { - EV_SET(&kevent, event->fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, event); - ::kevent(kqueue, &kevent, 1, nullptr, 0, nullptr); - } - if (events & EVENT_WRITE) { - EV_SET(&kevent, event->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, 0, event); - ::kevent(kqueue, &kevent, 1, nullptr, 0, nullptr); - } - - return event; -} \ No newline at end of file diff --git a/sese/internal/darwin/net/event/KqueueEventLoop.h b/sese/internal/darwin/net/event/KqueueEventLoop.h deleted file mode 100644 index 9615294884..0000000000 --- a/sese/internal/darwin/net/event/KqueueEventLoop.h +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file KqueueEventLoop.h - * @brief kqueue event loop - * @author kaoru - */ - -#pragma once - -#include -#include - -#include - -namespace sese::event { -/// kqueue event loop -class KqueueEventLoop : public BaseEventLoop { -public: - bool init() override; - - ~KqueueEventLoop() override; - - void dispatch(uint32_t timeout) override; - - void onAccept(int fd) override; - - void onRead(BaseEvent *event) override; - - void onWrite(BaseEvent *event) override; - - void onError(BaseEvent *event) override; - - void onClose(BaseEvent *event) override; - - BaseEvent *createEvent(int fd, unsigned int events, void *data) override; - - void freeEvent(BaseEvent *event) override; - - bool setEvent(BaseEvent *event) override; - - void setListenFd(int fd) override { listenFd = fd; } - - //protected: - // bool addNativeEvent(int fd, uint32_t ev, void *data) const; - // - // bool delNativeEvent(int fd, uint32_t ev, void *data) const; - -protected: - int listenFd{-1}; - BaseEvent *listenEvent{nullptr}; - int kqueue{-1}; -}; - -} // namespace sese::event \ No newline at end of file diff --git a/sese/internal/linux/net/event/EpollEvent.h b/sese/internal/linux/net/event/EpollEvent.h deleted file mode 100644 index d661a1e352..0000000000 --- a/sese/internal/linux/net/event/EpollEvent.h +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file EpollEvent.h - * @brief epoll event - * @author kaoru - */ - -#pragma once - -#include "sese/net/event/BaseEvent.h" - -namespace sese::event { -/// epoll event -class EpollEvent : public BaseEvent { -}; -} // namespace sese::event \ No newline at end of file diff --git a/sese/internal/linux/net/event/EpollEventConvert.cpp b/sese/internal/linux/net/event/EpollEventConvert.cpp deleted file mode 100644 index 2f8376f781..0000000000 --- a/sese/internal/linux/net/event/EpollEventConvert.cpp +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "sese/internal/linux/net/event/EpollEventConvert.h" - -#include - -unsigned int sese::event::EpollEventConvert::fromNativeEvent(int event) { - short result = 0; - // if (event & EPOLLIN) { - // result |= EVENT_READ; - // } - if (event & EPOLLOUT) { - result |= EVENT_WRITE; - } - if (event & EPOLLERR) { - result |= EVENT_ERROR; - } - return result; -} - -int sese::event::EpollEventConvert::toNativeEvent(unsigned int event) { - int result = 0; - if (event & EVENT_READ) { - result |= EPOLLIN; - } - if (event & EVENT_WRITE) { - result |= EPOLLOUT; - } - if (event & EVENT_ERROR) { - result |= EPOLLERR; - } - return result; -} diff --git a/sese/internal/linux/net/event/EpollEventConvert.h b/sese/internal/linux/net/event/EpollEventConvert.h deleted file mode 100644 index d9808bb6f8..0000000000 --- a/sese/internal/linux/net/event/EpollEventConvert.h +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** -* @file EpollEventConvert.h -* @brief epoll event convert -* @author kaoru -*/ - -#pragma once - -#include "sese/net/event/BaseEventConvert.h" - -namespace sese::event { -/// epoll epoll event convert -class EpollEventConvert : public BaseEventConvert { -public: - unsigned int fromNativeEvent(int event) override; - - int toNativeEvent(unsigned int event) override; -}; -} // namespace sese::event \ No newline at end of file diff --git a/sese/internal/linux/net/event/EpollEventLoop.cpp b/sese/internal/linux/net/event/EpollEventLoop.cpp deleted file mode 100644 index 0ff1bde5d6..0000000000 --- a/sese/internal/linux/net/event/EpollEventLoop.cpp +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "sese/net/event/BaseEvent.h" -#include "sese/internal/linux/net/event/EpollEventLoop.h" -#include "sese/internal/linux/net/event/EpollEvent.h" - -#include -#include -#include -#include - -#define MAX_EVENT_SIZE 64 - -bool sese::event::EpollEventLoop::init() { - epoll = epoll_create1(0); - if (-1 == epoll) return false; - if (0 >= listenFd) return true; - - this->listenEvent = this->createEvent(listenFd, EVENT_READ | EVENT_ERROR, nullptr); - if (!this->listenEvent) return false; - return true; -} - -sese::event::EpollEventLoop::~EpollEventLoop() { - if (-1 != epoll) { - close(epoll); - epoll = -1; - } - - if (listenEvent) { - delete listenEvent; - listenEvent = nullptr; - } -} - -void sese::event::EpollEventLoop::dispatch(uint32_t timeout) { - epoll_event events[MAX_EVENT_SIZE]{}; - - int number_of_fds = epoll_wait(epoll, events, MAX_EVENT_SIZE, (int) timeout); - if (-1 == number_of_fds) { - return; - } - - for (int i = 0; i < number_of_fds; ++i) { - auto event = (BaseEvent *) events[i].data.ptr; - auto fd = event->fd; - if (fd == listenFd) { - auto client = accept(fd, nullptr, nullptr); - if (-1 != client) { - onAccept(client); - } else { - continue; - } - } else { - if (events[i].events & EPOLLERR) { - onError((BaseEvent *) events[i].data.ptr); - } - if (events[i].events & EPOLLIN) { - if (event->events & EVENT_READ) { - onRead((BaseEvent *) events[i].data.ptr); - } - if (events[i].events & EPOLLRDHUP) { - onClose((BaseEvent *) events[i].data.ptr); - continue; - } - } - if (events[i].events & EPOLLOUT) { - if (events[i].events & EPOLLRDHUP) { - onClose((BaseEvent *) events[i].data.ptr); - continue; - } - if (event->events & EVENT_WRITE) { - onWrite((BaseEvent *) events[i].data.ptr); - } - } - } - } -} - -void sese::event::EpollEventLoop::onAccept(int fd) { -} - -void sese::event::EpollEventLoop::onRead(BaseEvent *event) { -} - -void sese::event::EpollEventLoop::onWrite(BaseEvent *event) { -} - -void sese::event::EpollEventLoop::onError(BaseEvent *event) { -} - -void sese::event::EpollEventLoop::onClose(sese::event::BaseEvent *event) { -} - -sese::event::BaseEvent *sese::event::EpollEventLoop::createEvent(int fd, unsigned int events, void *data) { - auto event = new EpollEvent; - event->fd = fd; - event->events = events; - event->data = data; - - epoll_event epoll_event{}; - if (handleClose) { - epoll_event.events = convert.toNativeEvent(events) | EPOLLIN | EPOLLET | EPOLLRDHUP; - } else { - epoll_event.events = convert.toNativeEvent(events) | EPOLLIN | EPOLLET; - } - epoll_event.data.ptr = event; - if (-1 == epoll_ctl(epoll, EPOLL_CTL_ADD, fd, &epoll_event)) { - delete event; - return nullptr; - } - - return event; -} - -void sese::event::EpollEventLoop::freeEvent(sese::event::BaseEvent *event) { - epoll_ctl(epoll, EPOLL_CTL_DEL, event->fd, nullptr); - delete event; -} - -bool sese::event::EpollEventLoop::setEvent(sese::event::BaseEvent *event) { - epoll_event epoll_event{}; - if (handleClose) { - epoll_event.events = convert.toNativeEvent(event->events) | EPOLLIN | EPOLLET | EPOLLRDHUP; - } else { - epoll_event.events = convert.toNativeEvent(event->events) | EPOLLIN | EPOLLET; - } - epoll_event.data.ptr = event; - - auto result = epoll_ctl(epoll, EPOLL_CTL_MOD, event->fd, &epoll_event); - return (result == 0 || (result == -1 && errno == EEXIST)); -} \ No newline at end of file diff --git a/sese/internal/linux/net/event/EpollEventLoop.h b/sese/internal/linux/net/event/EpollEventLoop.h deleted file mode 100644 index 8a502593da..0000000000 --- a/sese/internal/linux/net/event/EpollEventLoop.h +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** -* @file EpollEventLoop.h -* @brief epoll event loop -* @author kaoru -*/ - -#pragma once - -#include "sese/net/event/BaseEvent.h" -#include "sese/net/event/BaseEventLoop.h" -#include "sese/internal/linux/net/event/EpollEvent.h" -#include "sese/internal/linux/net/event/EpollEventConvert.h" - -namespace sese::event { -/// epoll event loop -class EpollEventLoop : public sese::event::BaseEventLoop { -public: - bool init() override; - - ~EpollEventLoop() override; - - void dispatch(uint32_t timeout) override; - - void onAccept(int fd) override; - - void onRead(BaseEvent *event) override; - - void onWrite(BaseEvent *event) override; - - void onError(BaseEvent *event) override; - - void onClose(BaseEvent *event) override; - - BaseEvent *createEvent(int fd, unsigned int events, void *data) override; - - void freeEvent(BaseEvent *event) override; - - bool setEvent(BaseEvent *event) override; - - void setListenFd(int fd) override { listenFd = fd; } - -protected: - int listenFd{-1}; - BaseEvent *listenEvent{nullptr}; - - int epoll{-1}; - EpollEventConvert convert; -}; -} // namespace sese::event \ No newline at end of file diff --git a/sese/internal/service/http/HttpConnection.h b/sese/internal/service/http/HttpConnection.h index 72cce76058..c8b880775f 100644 --- a/sese/internal/service/http/HttpConnection.h +++ b/sese/internal/service/http/HttpConnection.h @@ -17,7 +17,7 @@ #include #include -#include +#include #include #include diff --git a/sese/internal/win/net/event/WSAEvent.h b/sese/internal/win/net/event/WSAEvent.h deleted file mode 100644 index 23293c5a8d..0000000000 --- a/sese/internal/win/net/event/WSAEvent.h +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file WSAEvent.h - * @brief WSAEventSelect event - * @author kaoru - */ - -#pragma once - -#include "sese/net/event/BaseEvent.h" - -namespace sese::event { -/// WSAEventSelect event -class WSAEvent : public BaseEvent { -public: - void *wsaEvent{nullptr}; -}; -} // namespace sese::event \ No newline at end of file diff --git a/sese/internal/win/net/event/WSAEventConvert.cpp b/sese/internal/win/net/event/WSAEventConvert.cpp deleted file mode 100644 index c7dbca4aa9..0000000000 --- a/sese/internal/win/net/event/WSAEventConvert.cpp +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "sese/internal/win/net/event/WSAEventConvert.h" - -#include - -unsigned int sese::event::WSAEventConvert::fromNativeEvent(int event) { - unsigned result = 0; - if (event & FD_READ) { - result |= EVENT_READ; - } - if (event & FD_WRITE) { - result |= EVENT_WRITE; - } - return result; -} - -int sese::event::WSAEventConvert::toNativeEvent(unsigned int event) { - int result = 0; - if (event & EVENT_READ) { - result |= FD_READ; - } - if (event & EVENT_WRITE) { - result |= FD_WRITE; - } - return result; -} diff --git a/sese/internal/win/net/event/WSAEventConvert.h b/sese/internal/win/net/event/WSAEventConvert.h deleted file mode 100644 index a35e301cf2..0000000000 --- a/sese/internal/win/net/event/WSAEventConvert.h +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file WSAEventConvert.h - * @brief WSAEventSelect event convert - * @author kaoru - */ - -#pragma once - -#include "sese/net/event/BaseEventConvert.h" - -namespace sese::event { -/// WSAEventSelect event convert -class WSAEventConvert : public BaseEventConvert { -public: - unsigned int fromNativeEvent(int event) override; - - int toNativeEvent(unsigned int event) override; -}; -} // namespace sese::event \ No newline at end of file diff --git a/sese/internal/win/net/event/WSAEventLoop.cpp b/sese/internal/win/net/event/WSAEventLoop.cpp deleted file mode 100644 index d8d62d00de..0000000000 --- a/sese/internal/win/net/event/WSAEventLoop.cpp +++ /dev/null @@ -1,178 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "sese/internal/win/net/event/WSAEventLoop.h" - -#include - -bool sese::event::WSAEventLoop::init() { - wsaEvent = WSACreateEvent(); - if (WSA_INVALID_EVENT == wsaEvent) return false; - if (0 >= listenFd) return true; - - if (WSAEventSelect(listenFd, wsaEvent, FD_ACCEPT)) { - WSACloseEvent(wsaEvent); - wsaEvent = nullptr; - return false; - } - - this->listenEvent = new WSAEvent; - this->listenEvent->events = EVENT_ERROR; - this->listenEvent->fd = listenFd; - this->listenEvent->wsaEvent = wsaEvent; - - sockets[0] = listenFd; - wsaEvents[0] = listenEvent->wsaEvent; - events[0] = listenEvent; - numbers += 1; - - return true; -} - -sese::event::WSAEventLoop::~WSAEventLoop() { - if (wsaEvent) { - WSACloseEvent(wsaEvent); - wsaEvent = nullptr; - } - - if (listenEvent) { - delete listenEvent; - listenEvent = nullptr; - } -} - -void sese::event::WSAEventLoop::dispatch(uint32_t timeout) { - DWORD n_index = WSAWaitForMultipleEvents(numbers, wsaEvents, FALSE, timeout, FALSE); - if (n_index == WSA_WAIT_FAILED || n_index == WSA_WAIT_TIMEOUT) return; - - n_index -= WSA_WAIT_EVENT_0; - for (DWORD i = n_index; i < numbers; ++i) { - n_index = ::WSAWaitForMultipleEvents(1, &wsaEvents[i], TRUE, timeout, FALSE); - if (n_index == WSA_WAIT_FAILED || n_index == WSA_WAIT_TIMEOUT) continue; - - WSANETWORKEVENTS enum_event; - WSAEnumNetworkEvents(sockets[i], wsaEvents[i], &enum_event); - if (enum_event.lNetworkEvents & FD_ACCEPT) { - if (enum_event.iErrorCode[FD_ACCEPT_BIT] == 0) { - SOCKET client = accept(sockets[i], nullptr, nullptr); - if (-1 != client) { - onAccept(static_cast(client)); - } - } else if (enum_event.iErrorCode[FD_ACCEPT_BIT] != 0 && events[i]->events & EVENT_ERROR) { - onError(events[i]); - } - } - if ((enum_event.lNetworkEvents & FD_CLOSE) && handleClose) { - // mutex.lock(); - // WSACloseEvent(wsaEvents[i]); - // memmove(&sockets[i], &sockets[i], (numbers - i - 1) * sizeof(SOCKET)); - // memmove(&wsaEvents[i], &wsaEvents[i], (numbers - i - 1) * sizeof(HANDLE)); - // memmove(&events[i], &events[i], (numbers - i - 1) * sizeof(WSAEvent *)); - // numbers -= 1; - // mutex.unlock(); - onClose(events[i]); - } - if (enum_event.lNetworkEvents & FD_READ) { - if (enum_event.iErrorCode[FD_READ_BIT] == 0) { - char buf; - if (1 == recv(sockets[i], &buf, 1, MSG_PEEK)) { - onRead(events[i]); - } - } else if (enum_event.iErrorCode[FD_READ_BIT] != 0 && events[i]->events & EVENT_ERROR) { - onError(events[i]); - } - } - if (enum_event.lNetworkEvents & FD_WRITE) { - if (enum_event.iErrorCode[FD_WRITE_BIT] == 0) { - char buf; - if (0 == send(sockets[i], &buf, 0, 0)) { - onWrite(events[i]); - } - } else if (enum_event.iErrorCode[FD_WRITE_BIT] != 0 && events[i]->events & EVENT_ERROR) { - onError(events[i]); - } - } - } -} - -void sese::event::WSAEventLoop::onAccept(int fd) { -} - -void sese::event::WSAEventLoop::onRead(sese::event::BaseEvent *event) { -} - -void sese::event::WSAEventLoop::onWrite(sese::event::BaseEvent *event) { -} - -void sese::event::WSAEventLoop::onError(sese::event::BaseEvent *event) { -} - -void sese::event::WSAEventLoop::onClose(sese::event::BaseEvent *event) { -} - -sese::event::BaseEvent *sese::event::WSAEventLoop::createEvent(int fd, unsigned int events, void *data) { - WSAEVENT wsa_event = WSACreateEvent(); - if (WSAEventSelect(fd, wsa_event, convert.toNativeEvent(events) | FD_CLOSE)) { - WSACloseEvent(wsa_event); - return nullptr; - } - - auto event = new WSAEvent; - event->fd = fd; - event->events = events; - event->data = data; - event->wsaEvent = wsa_event; - - // mutex.lock(); - sockets[numbers] = fd; - wsaEvents[numbers] = wsa_event; - this->events[numbers] = event; - numbers += 1; - // mutex.unlock(); - - return event; -} - -void sese::event::WSAEventLoop::freeEvent(sese::event::BaseEvent *event) { - bool found = false; - unsigned long i = 0; - for (; i < numbers; ++i) { - if (event->fd == sockets[i]) { - found = true; - break; - } - } - - if (found) { - WSACloseEvent(wsaEvents[i]); - memmove(&sockets[i], &sockets[i], (numbers - i - 1) * sizeof(SOCKET)); - memmove(&wsaEvents[i], &wsaEvents[i], (numbers - i - 1) * sizeof(HANDLE)); - memmove(&events[i], &events[i], (numbers - i - 1) * sizeof(WSAEvent *)); - numbers -= 1; - } else { - // This generally doesn't happen - } - - delete event; -} - -bool sese::event::WSAEventLoop::setEvent(sese::event::BaseEvent *event) { - auto ev = reinterpret_cast(event); - auto rt = 0 == WSAEventSelect(ev->fd, ev->wsaEvent, convert.toNativeEvent(ev->events) | FD_CLOSE); - return rt; -} - -void sese::event::WSAEventLoop::setListenFd(int fd) { - this->listenFd = fd; -} diff --git a/sese/internal/win/net/event/WSAEventLoop.h b/sese/internal/win/net/event/WSAEventLoop.h deleted file mode 100644 index 2795527f53..0000000000 --- a/sese/internal/win/net/event/WSAEventLoop.h +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file WSAEventLoop.h - * @brief WSAEventSelect event loop - * @author kaoru - */ - -#pragma once - -#include -#include -#include - -#include - -#define MAX_EVENT_SIZE 64 - -namespace sese::event { -/// WSAEventSelect event loop -class WSAEventLoop : public BaseEventLoop { -public: - ~WSAEventLoop() override; - - bool init() override; - - void dispatch(uint32_t timeout) override; - - void onAccept(int fd) override; - - void onRead(BaseEvent *event) override; - - void onWrite(BaseEvent *event) override; - - void onError(BaseEvent *event) override; - - void onClose(BaseEvent *event) override; - - BaseEvent *createEvent(int fd, unsigned int events, void *data) override; - - void freeEvent(BaseEvent *event) override; - - bool setEvent(BaseEvent *event) override; - - void setListenFd(int fd) override; - -protected: - int listenFd{-1}; - WSAEvent *listenEvent{nullptr}; - - void *wsaEvent{nullptr}; - WSAEventConvert convert; - - unsigned long numbers = 0; - unsigned long long sockets[MAX_EVENT_SIZE]{}; - void *wsaEvents[MAX_EVENT_SIZE]{}; - // Here, the lifecycle should be the responsibility of the user - WSAEvent *events[MAX_EVENT_SIZE]{}; -}; - -} // namespace sese::event - -#undef MAX_EVENT_SIZE \ No newline at end of file diff --git a/sese/internal/win/service/iocp/NativeIOCPServer_V1.cpp b/sese/internal/win/service/iocp/NativeIOCPServer_V1.cpp deleted file mode 100644 index 9654dceea3..0000000000 --- a/sese/internal/win/service/iocp/NativeIOCPServer_V1.cpp +++ /dev/null @@ -1,544 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include - -#include - -#include - -using namespace sese::net; -using namespace sese::iocp; -using namespace sese::_windows::iocp::v1; - -NativeContext::NativeContext(OverlappedWrapper *p_wrapper) : pWrapper(p_wrapper) { - wsabufWrite.buf = static_cast(malloc(IOCP_WSABUF_SIZE)); -} - -NativeContext::~NativeContext() { - free(wsabufWrite.buf); -} - -int64_t NativeContext::read(void *buffer, size_t length) { - if (ssl) { - return SSL_read(static_cast(ssl), buffer, static_cast(length)); - } else { - return recv.read(buffer, length); - } -} - -int64_t NativeContext::write(const void *buffer, size_t length) { - if (ssl) { - return SSL_write(static_cast(ssl), buffer, static_cast(length)); - } else { - return send.write(buffer, length); - } -} - -int64_t NativeContext::peek(void *buffer, size_t length) { - if (ssl) { - return SSL_peek(static_cast(ssl), buffer, static_cast(length)); - } else { - return recv.peek(buffer, length); - } -} - -int64_t NativeContext::trunc(size_t length) { - if (ssl) { - char buffer[1024]{}; - int64_t real = 0; - while (true) { - const auto NEED = std::min(static_cast(length - real), sizeof(buffer)); - if (const int L = SSL_read(static_cast(ssl), buffer, NEED); L > 0) { - real += L; - } else { - break; - } - if (real == length) { - break; - } - } - return real; - } else { - return recv.trunc(length); - } -} - -OverlappedWrapper::OverlappedWrapper() : ctx(this) { -} - -bool NativeIOCPServer::init() { - if (!initConnectEx()) { - return false; - } - - if (address) { - listenFd = WSASocketW( - address->getFamily(), - SOCK_STREAM, - 0, - nullptr, - 0, - WSA_FLAG_OVERLAPPED - ); - if (listenFd == INVALID_SOCKET) { - return false; - } - if (SOCKET_ERROR == Socket::setNonblocking(listenFd)) { - return false; - } - if (SOCKET_ERROR == Socket::bind(listenFd, address->getRawAddress(), address->getRawAddressLength())) { - return false; - } - if (SOCKET_ERROR == Socket::listen(listenFd, SOMAXCONN)) { - return false; - } - } - - iocpFd = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, static_cast(threads)); - if (iocpFd == INVALID_HANDLE_VALUE) { - if (address) { - Socket::close(listenFd); - listenFd = INVALID_SOCKET; - } - return false; - } - - for (int i = 0; i < threads; ++i) { - auto th = std::make_unique([this] { eventThreadProc(); }, "IOCP_" + std::to_string(i + 1)); - th->start(); - eventThreadGroup.emplace_back(std::move(th)); - } - - acceptThread = std::make_unique([this] { acceptThreadProc(); }, "IOCP_0"); - acceptThread->start(); - - if (sslCtx) { - const auto SERVER_SSL = static_cast(sslCtx->getContext()); - SSL_CTX_set_alpn_select_cb( - SERVER_SSL, - reinterpret_cast(&alpnCallbackFunction), - this - ); - } - - const auto METHOD = BIO_meth_new(BIO_get_new_index() | BIO_TYPE_SOURCE_SINK, "bioIOCP"); - BIO_meth_set_ctrl(METHOD, reinterpret_cast(&NativeIOCPServer::bioCtrl)); - BIO_meth_set_read(METHOD, reinterpret_cast(&NativeIOCPServer::bioRead)); - BIO_meth_set_write(METHOD, reinterpret_cast(&NativeIOCPServer::bioWrite)); - this->bioMethod = METHOD; - - return true; -} - -void NativeIOCPServer::shutdown() { - void *lp_completion_key = nullptr; - isShutdown = true; - for (int i = 0; i < threads; ++i) { - PostQueuedCompletionStatus(iocpFd, -1, reinterpret_cast(lp_completion_key), nullptr); - } - for (auto &&thread: eventThreadGroup) { - thread->join(); - } - eventThreadGroup.clear(); - acceptThread->join(); - acceptThread = nullptr; - - for (auto &&item: wrapperSet) { - deleteContextCallback(&item->ctx); - Socket::close(item->ctx.fd); - delete item; - } - wrapperSet.clear(); - - if (bioMethod) { - BIO_meth_free(static_cast(bioMethod)); - bioMethod = nullptr; - } -} - -void NativeIOCPServer::postRead(NativeIOCPServer::Context *ctx) { - ctx->type = NativeContext::Type::READ; - ctx->readNode = std::make_unique(IOCP_WSABUF_SIZE); - ctx->wsabufRead.buf = static_cast(ctx->readNode->buffer); - ctx->wsabufRead.len = static_cast(ctx->readNode->CAPACITY); - DWORD n_bytes, dw_flags = 0; - int n_rt = WSARecv( - ctx->fd, - &ctx->wsabufRead, - 1, - &n_bytes, - &dw_flags, - &(ctx->pWrapper->overlapped), - nullptr - ); - auto e = getNetworkError(); - if (n_rt == SOCKET_ERROR && e != ERROR_IO_PENDING) { - releaseContext(ctx); - } -} - -void NativeIOCPServer::postWrite(NativeIOCPServer::Context *ctx) { - auto len = ctx->send.peek(ctx->wsabufWrite.buf, IOCP_WSABUF_SIZE); - if (len == 0) { - return; - } - ctx->type = NativeContext::Type::WRITE; - ctx->wsabufWrite.len = static_cast(len); - DWORD n_bytes, dw_flags = 0; - int n_rt = WSASend( - ctx->fd, - &ctx->wsabufWrite, - 1, - &n_bytes, - dw_flags, - &(ctx->pWrapper->overlapped), - nullptr - ); - auto e = getNetworkError(); - if (n_rt == SOCKET_ERROR && e != ERROR_IO_PENDING) { - releaseContext(ctx); - } -} - -void NativeIOCPServer::postClose(NativeIOCPServer::Context *ctx) { - if (activeReleaseMode) { - void *lp_completion_key = nullptr; - ctx->type = Context::Type::CLOSE; - PostQueuedCompletionStatus(iocpFd, 0, reinterpret_cast(lp_completion_key), reinterpret_cast(ctx->pWrapper)); - } else { - releaseContext(ctx); - } -} - -#define ConnectEx ((LPFN_CONNECTEX) connectEx) - -void NativeIOCPServer::postConnect(const net::IPAddress::Ptr &to, const security::SSLContext::Ptr &cli_ctx, void *data) { - auto sock = sese::net::Socket::socket(to->getFamily(), SOCK_STREAM, IPPROTO_IP); - if (to->getFamily() == AF_INET) { - const auto FROM = sese::net::IPv4Address::any(); - sese::net::Socket::bind(sock, FROM->getRawAddress(), FROM->getRawAddressLength()); - } else { - const auto FROM = sese::net::IPv6Address::any(); - sese::net::Socket::bind(sock, FROM->getRawAddress(), FROM->getRawAddressLength()); - } - - sese::net::Socket::setNonblocking(sock); - - auto p_wrapper = new OverlappedWrapper(); - p_wrapper->ctx.fd = sock; - p_wrapper->ctx.self = this; - p_wrapper->ctx.type = NativeContext::Type::CONNECT; - p_wrapper->ctx.data = data; - if (cli_ctx) { - p_wrapper->ctx.ssl = SSL_new(static_cast(cli_ctx->getContext())); - SSL_set_fd(static_cast(p_wrapper->ctx.ssl), static_cast(sock)); - SSL_set_alpn_protos(static_cast(p_wrapper->ctx.ssl), reinterpret_cast(clientProtos.c_str()), static_cast(clientProtos.length())); - } - auto addr = to->getRawAddress(); - auto len = to->getRawAddressLength(); - - CreateIoCompletionPort(reinterpret_cast(p_wrapper->ctx.fd), iocpFd, 0, 0); - BOOL n_rt = ConnectEx(sock, addr, len, nullptr, 0, nullptr, reinterpret_cast(p_wrapper)); - auto e = getNetworkError(); - if (n_rt == FALSE && e != ERROR_IO_PENDING) { - releaseContext(&p_wrapper->ctx); - } else { - wrapperSetMutex.lock(); - wrapperSet.emplace(p_wrapper); - wrapperSetMutex.unlock(); - onPreConnect(&p_wrapper->ctx); - } -} - -#undef ConnectEx - -void NativeIOCPServer::acceptThreadProc() { - using namespace std::chrono_literals; - - while (!isShutdown) { - - if (listenFd != INVALID_SOCKET) { - auto client_socket = accept(listenFd, nullptr, nullptr); - if (client_socket == INVALID_SOCKET) { - // std::this_thread::sleep_for(500ms); - wrapperSetMutex.lock(); - wheel.check(); - wrapperSetMutex.unlock(); - continue; - } - - if (SOCKET_ERROR == Socket::setNonblocking(client_socket)) { - Socket::close(client_socket); - // std::this_thread::sleep_for(500ms); - wrapperSetMutex.lock(); - wheel.check(); - wrapperSetMutex.unlock(); - continue; - } - - auto p_wrapper = new OverlappedWrapper; - p_wrapper->ctx.fd = client_socket; - p_wrapper->ctx.self = this; - - if (sslCtx) { - auto server_ssl = static_cast(sslCtx->getContext()); - auto client_ssl = SSL_new(server_ssl); - SSL_set_fd(client_ssl, static_cast(client_socket)); - SSL_set_accept_state(client_ssl); - - while (true) { - auto rt = SSL_do_handshake(client_ssl); - if (rt <= 0) { - auto err = SSL_get_error(client_ssl, rt); - if (err != SSL_ERROR_WANT_READ && err != SSL_ERROR_WANT_WRITE) { - SSL_free(client_ssl); - Socket::close(client_socket); - delete p_wrapper; - wrapperSetMutex.lock(); - wheel.check(); - wrapperSetMutex.unlock(); - return; - } - } else { - p_wrapper->ctx.ssl = client_ssl; - SSL_set_mode(client_ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); - const uint8_t *data = nullptr; - uint32_t data_length; - SSL_get0_alpn_selected(client_ssl, &data, &data_length); - onAlpnGet(&p_wrapper->ctx, data, data_length); - p_wrapper->ctx.bio = BIO_new(static_cast(bioMethod)); - BIO_set_data(static_cast(p_wrapper->ctx.bio), &p_wrapper->ctx); - BIO_set_init(static_cast(p_wrapper->ctx.bio), 1); - BIO_set_shutdown(static_cast(p_wrapper->ctx.bio), 0); - SSL_set_bio(static_cast(p_wrapper->ctx.ssl), static_cast(p_wrapper->ctx.bio), static_cast(p_wrapper->ctx.bio)); - break; - } - } - } - - CreateIoCompletionPort(reinterpret_cast(p_wrapper->ctx.fd), iocpFd, 0, 0); - - wrapperSetMutex.lock(); - wrapperSet.emplace(p_wrapper); - wheel.check(); - wrapperSetMutex.unlock(); - - onAcceptCompleted(&p_wrapper->ctx); - } else { - std::this_thread::sleep_for(500ms); - wrapperSetMutex.lock(); - wheel.check(); - wrapperSetMutex.unlock(); - } - } -} - -void NativeIOCPServer::eventThreadProc() { - OverlappedWrapper *p_wrapper{}; - DWORD lp_number_of_bytes_transferred = 0; - void *lp_completion_key = nullptr; - - while (!isShutdown) { - GetQueuedCompletionStatus( - iocpFd, - &lp_number_of_bytes_transferred, - reinterpret_cast(&lp_completion_key), - reinterpret_cast(&p_wrapper), - INFINITE - ); - if (p_wrapper == nullptr) { - continue; - } else if (lp_number_of_bytes_transferred == 0 && p_wrapper->ctx.type != NativeContext::Type::CONNECT) { - // Active release mode is turned off on the peer - // Involuntary shutdown in any mode - if (activeReleaseMode || p_wrapper->ctx.type != NativeContext::Type::CLOSE) { - releaseContext(&p_wrapper->ctx); - } - continue; - } else if (lp_number_of_bytes_transferred == -1) { - break; - } - - if (p_wrapper->ctx.type == NativeContext::Type::READ) { - onPreRead(&p_wrapper->ctx); - p_wrapper->ctx.readNode->size = lp_number_of_bytes_transferred; - p_wrapper->ctx.recv.push(std::move(p_wrapper->ctx.readNode)); - p_wrapper->ctx.readNode = nullptr; - p_wrapper->ctx.wsabufRead.buf = nullptr; - p_wrapper->ctx.wsabufRead.len = 0; - if (lp_number_of_bytes_transferred == IOCP_WSABUF_SIZE) { - postRead(&p_wrapper->ctx); - } else { - onReadCompleted(&p_wrapper->ctx); - } - } else if (p_wrapper->ctx.type == NativeContext::Type::WRITE) { - p_wrapper->ctx.send.trunc(lp_number_of_bytes_transferred); - auto len = p_wrapper->ctx.send.peek(p_wrapper->ctx.wsabufWrite.buf, IOCP_WSABUF_SIZE); - if (len == 0) { - onWriteCompleted(&p_wrapper->ctx); - } else { - p_wrapper->ctx.type = NativeContext::Type::WRITE; - p_wrapper->ctx.wsabufWrite.len = static_cast(len); - DWORD n_bytes, dw_flags = 0; - int n_rt = WSASend( - p_wrapper->ctx.fd, - &p_wrapper->ctx.wsabufWrite, - 1, - &n_bytes, - dw_flags, - &(p_wrapper->overlapped), - nullptr - ); - auto e = getNetworkError(); - if (n_rt == SOCKET_ERROR && e != ERROR_IO_PENDING) { - releaseContext(&p_wrapper->ctx); - } - } - } else { - auto connect_status = GetOverlappedResult(reinterpret_cast(p_wrapper->ctx.fd), reinterpret_cast(p_wrapper), &lp_number_of_bytes_transferred, TRUE); - if (connect_status == FALSE) { - releaseContext(&p_wrapper->ctx); - continue; - } - - auto ssl = static_cast(p_wrapper->ctx.ssl); - if (ssl) { - SSL_set_connect_state(ssl); - // GCOVR_EXCL_START - while (true) { - auto rt = SSL_do_handshake((SSL *) ssl); - if (rt <= 0) { - // err is SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE - auto err = SSL_get_error((SSL *) ssl, rt); - if (err != SSL_ERROR_WANT_READ && err != SSL_ERROR_WANT_WRITE) { - SSL_free((SSL *) ssl); - p_wrapper->ctx.ssl = nullptr; - ssl = nullptr; - break; - } - } else { - break; - } - } - // GCOVR_EXCL_STOP - if (ssl == nullptr) { - releaseContext(&p_wrapper->ctx); - continue; - } else { - SSL_set_mode(ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); - const uint8_t *data = nullptr; - uint32_t data_length; - SSL_get0_alpn_selected(ssl, &data, &data_length); - onAlpnGet(&p_wrapper->ctx, data, data_length); - p_wrapper->ctx.bio = BIO_new(static_cast(bioMethod)); - BIO_set_data(static_cast(p_wrapper->ctx.bio), &p_wrapper->ctx); - BIO_set_init(static_cast(p_wrapper->ctx.bio), 1); - BIO_set_shutdown(static_cast(p_wrapper->ctx.bio), 0); - SSL_set_bio(static_cast(p_wrapper->ctx.ssl), static_cast(p_wrapper->ctx.bio), static_cast(p_wrapper->ctx.bio)); - } - } - p_wrapper->ctx.type = Context::Type::READY; - p_wrapper->ctx.self->onConnected(&p_wrapper->ctx); - } - } -} - -int NativeIOCPServer::onAlpnSelect(const uint8_t **out, uint8_t *out_length, const uint8_t *in, uint32_t in_length) { - if (SSL_select_next_proto(const_cast(out), out_length, reinterpret_cast(servProtos.c_str()), static_cast(servProtos.length()), in, in_length) != OPENSSL_NPN_NEGOTIATED) { - return SSL_TLSEXT_ERR_NOACK; - } - return SSL_TLSEXT_ERR_OK; -} - -int NativeIOCPServer::alpnCallbackFunction([[maybe_unused]] void *ssl, const uint8_t **out, uint8_t *out_length, const uint8_t *in, uint32_t in_length, NativeIOCPServer *server) { - return server->onAlpnSelect(out, out_length, in, in_length); -} - -bool NativeIOCPServer::initConnectEx() { - auto sock = ::socket(AF_INET, SOCK_STREAM, 0); - DWORD dw_bytes; - GUID guid = WSAID_CONNECTEX; - auto rc = WSAIoctl( - sock, SIO_GET_EXTENSION_FUNCTION_POINTER, - &guid, sizeof(guid), - &connectEx, sizeof(connectEx), - &dw_bytes, nullptr, nullptr - ); - Socket::close(sock); - return rc == 0; -} - -long NativeIOCPServer::bioCtrl([[maybe_unused]] void *bio, int cmd, [[maybe_unused]] long num, [[maybe_unused]] void *ptr) { - int ret = 0; - if (cmd == BIO_CTRL_FLUSH) { - ret = 1; - } - return ret; -} - -int NativeIOCPServer::bioWrite(void *bio, const char *in, int length) { - auto ctx = static_cast(BIO_get_data(static_cast(bio))); - return static_cast(ctx->send.write(in, length)); -} - -int NativeIOCPServer::bioRead(void *bio, char *out, int length) { - auto ctx = static_cast(BIO_get_data(static_cast(bio))); - return static_cast(ctx->recv.read(out, length)); -} - -void NativeIOCPServer::setTimeout(NativeIOCPServer::Context *ctx, int64_t seconds) { - wrapperSetMutex.lock(); - ctx->timeoutEvent = wheel.delay( - [this, ctx]() { - // auto pWrapper = ctx->pWrapper; - // Socket::close(pWrapper->ctx.fd); - // pWrapper->ctx.self->getDeleteContextCallback()(&pWrapper->ctx); - // wrapperSet.erase(pWrapper); - // delete ctx->pWrapper; - ctx->timeoutEvent = nullptr; - this->onTimeout(ctx); - }, - seconds, false - ); - wrapperSetMutex.unlock(); -} - -void NativeIOCPServer::cancelTimeout(NativeIOCPServer::Context *ctx) { - if (ctx->timeoutEvent) { - wrapperSetMutex.lock(); - wheel.cancel(ctx->timeoutEvent); - ctx->timeoutEvent = nullptr; - wrapperSetMutex.unlock(); - } -} - -void NativeIOCPServer::releaseContext(Context *ctx) { - wrapperSetMutex.lock(); - auto p_wrapper = ctx->pWrapper; - wrapperSet.erase(p_wrapper); - if (p_wrapper->ctx.timeoutEvent) { - wheel.cancel(p_wrapper->ctx.timeoutEvent); - p_wrapper->ctx.timeoutEvent = nullptr; - } - wrapperSetMutex.unlock(); - Socket::close(p_wrapper->ctx.fd); - if (p_wrapper->ctx.ssl) { - SSL_free(static_cast(p_wrapper->ctx.ssl)); - p_wrapper->ctx.ssl = nullptr; - } - p_wrapper->ctx.self->getDeleteContextCallback()(&p_wrapper->ctx); - delete p_wrapper; -} diff --git a/sese/internal/win/service/iocp/NativeIOCPServer_V1.h b/sese/internal/win/service/iocp/NativeIOCPServer_V1.h deleted file mode 100644 index a25396113c..0000000000 --- a/sese/internal/win/service/iocp/NativeIOCPServer_V1.h +++ /dev/null @@ -1,334 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file NativeIOCPServer_V1.h - * @brief Native IOCP Server for Windows - * @author kaoru - * @version 0.1 - * @date September 25, 2023 - */ - -#pragma once - -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -namespace sese::_windows::iocp::v1 { - -class NativeIOCPServer; -struct OverlappedWrapper; - -/// Native IOCP operational context -class NativeContext final : public io::InputStream, public io::OutputStream, public io::PeekableStream { - friend class NativeIOCPServer; - using IOBuf = sese::iocp::IOBuf; - using Node = std::unique_ptr; - - enum class Type { - READ, - WRITE, - CONNECT, - READY, - CLOSE - }; - - OverlappedWrapper *pWrapper{}; - WSABUF wsabufRead{}; - WSABUF wsabufWrite{}; - Type type{Type::READ}; - SOCKET fd{INVALID_SOCKET}; - NativeIOCPServer *self{}; - TimeoutEvent *timeoutEvent{}; - void *ssl{}; - void *bio{}; - Node readNode; - IOBuf recv{}; - io::ByteBuilder send{IOCP_WSABUF_SIZE, IOCP_WSABUF_SIZE}; - void *data{}; - -public: - /** - * Context initialization - * @param p_wrapper Overlapped wrapper - */ - explicit NativeContext(OverlappedWrapper *p_wrapper); - ~NativeContext() override; - /** - * Read content from the current connection - * @param buffer The buffer - * @param length The size of the buffer - * @return The actual amount of data read - */ - int64_t read(void *buffer, size_t length) override; - /** - * Write content to the current connection - * @param buffer The buffer - * @param length The size of the buffer - * @return The actual amount of data written - */ - int64_t write(const void *buffer, size_t length) override; - /** - * Read content from the current connection without advancing - * @param buffer The buffer - * @param length The size of the buffer - * @return The actual amount of data read - */ - int64_t peek(void *buffer, size_t length) override; - /** - * Advance in the current connection without reading content - * @param length The amount to advance - * @return The actual amount advanced - */ - int64_t trunc(size_t length) override; - /** - * Get the file descriptor of the current context connection - * @return The file descriptor - */ - [[nodiscard]] int32_t getFd() const { return static_cast(NativeContext::fd); } - /** - * Get the additional data of the current context - * @return The additional data - */ - [[nodiscard]] void *getData() const { return NativeContext::data; } - /** - * Set the additional data of the current context - * @param p_data The additional data - */ - void setData(void *p_data) { NativeContext::data = p_data; } -}; - -/// Overlapped Wrapper -struct OverlappedWrapper final { - OVERLAPPED overlapped{}; - NativeContext ctx; - - OverlappedWrapper(); -}; - -/// Native IOCP Server for Windows -class NativeIOCPServer { -public: - using Context = NativeContext; - using DeleteContextCallback = std::function; - - virtual ~NativeIOCPServer() = default; - - /** - * Initialize and start the server - * @return The result of initialization - */ - bool init(); - /** - * Terminate worker threads, release system resources, and shut down the server - */ - void shutdown(); - /** - * Post a read event - * @param ctx The operation context - */ - void postRead(Context *ctx); - /** - * Post a write event - * @param ctx The operation context - */ - void postWrite(Context *ctx); - /** - * Post a close event - * @param ctx The operation context - */ - void postClose(Context *ctx); - /** - * Post a connection event - * @param to The connection address - * @param cli_ctx The SSL client context - * @param data The additional data - */ - void postConnect(const net::IPAddress::Ptr &to, const security::SSLContext::Ptr &cli_ctx, void *data = nullptr); - /** - * Set a timeout event - * @param ctx The operation context - * @param seconds The timeout duration - */ - void setTimeout(Context *ctx, int64_t seconds); - /** - * Cancel a timeout event - * @param ctx The operation context - */ - void cancelTimeout(Context *ctx); - /** - * Default context release callback function - */ - static void onDeleteContext(Context *) {} - /** - * Connection handshake completion callback function - * @param ctx The operation context - */ - virtual void onAcceptCompleted(Context *ctx) {} - /** - * Read event trigger callback function - * @param ctx The operation context - */ - virtual void onPreRead(Context *ctx) {} - /** - * Read event completion callback function - * @param ctx The operation context - */ - virtual void onReadCompleted(Context *ctx) {} - /** - * Write event completion callback function - * @param ctx The operation context - */ - virtual void onWriteCompleted(Context *ctx) {} - /** - * Timeout event callback function - * @param ctx Operation context - */ - virtual void onTimeout(Context *ctx) {} - /** - * Pre-connection event callback function - * @param ctx The operation context - */ - virtual void onPreConnect(Context *ctx) {} - /** - * Connection event callback function - * @param ctx The operation context - */ - virtual void onConnected(Context *ctx){}; - /** - * ALPN protocol negotiation completion callback function - * @param ctx The context - * @param in The negotiation content - * @param in_length The length of the negotiation content - */ - virtual void onAlpnGet(Context *ctx, const uint8_t *in, uint32_t in_length){}; - /** - * ALPN negotiation callback function - * @param out The expected content from the peer - * @param out_length The length of the expected content from the peer - * @param in The response content - * @param in_length The length of the response content - * @return The ALPN status code - */ - int onAlpnSelect( - const uint8_t **out, uint8_t *out_length, - const uint8_t *in, uint32_t in_length - ); - -public: - /** - * Set the IP address bound to the current service. By setting this option, the server will automatically listen on the corresponding address's port. - * @param addr Target IP - */ - void setAddress(const net::IPAddress::Ptr &addr) { NativeIOCPServer::address = addr; } - /** - * Set the expected number of threads for the service - * @param number_of_threads Number of threads - */ - void setThreads(size_t number_of_threads) { NativeIOCPServer::threads = number_of_threads; } - /** - * Set the SSL context used for server listening - * @param ctx SSL context - */ - void setServCtx(const security::SSLContext::Ptr &ctx) { NativeIOCPServer::sslCtx = ctx; } - /** - * Set the ALPN protocol negotiation content for the server - * @param protos Protocol negotiation content - */ - void setServProtos(const std::string &protos) { NativeIOCPServer::servProtos = protos; } - /** - * Set the ALPN protocol negotiation content for the client - * @param protos Protocol negotiation content - */ - void setClientProtos(const std::string &protos) { NativeIOCPServer::clientProtos = protos; } - /** - * Set the server operation context destruction callback function - * @param callback Callback function - */ - void setDeleteContextCallback(const DeleteContextCallback &callback) { NativeIOCPServer::deleteContextCallback = callback; } - /** - * Get the current server listening SSL context - * @return SSL context - */ - [[nodiscard]] const security::SSLContext::Ptr &getServCtx() const { return NativeIOCPServer::sslCtx; } - /** - * Get the current service operation context destruction callback function - * @return Callback function - */ - [[nodiscard]] const DeleteContextCallback &getDeleteContextCallback() const { return NativeIOCPServer::deleteContextCallback; }; - /** - * Get the active release mode status - * @return Active release mode status - */ - bool isActiveReleaseMode() const { return NativeIOCPServer::activeReleaseMode; } - -protected: - /** - * Set the active release mode - * @param enable Whether to enable - */ - void setActiveReleaseMode(bool enable) { NativeIOCPServer::activeReleaseMode = enable; } - /** - * Release the operation context - * @param ctx The operation context to be released - */ - void releaseContext(Context *ctx); - - void acceptThreadProc(); - void eventThreadProc(); - static int alpnCallbackFunction( - void *ssl, - const uint8_t **out, uint8_t *out_length, - const uint8_t *in, uint32_t in_length, - NativeIOCPServer *server - ); - - void *connectEx{}; - bool initConnectEx(); - - static long bioCtrl(void *bio, int cmd, long num, void *ptr); - static int bioWrite(void *bio, const char *in, int length); - static int bioRead(void *bio, char *out, int length); - - std::atomic_bool isShutdown{false}; - HANDLE iocpFd{INVALID_HANDLE_VALUE}; - SOCKET listenFd{INVALID_SOCKET}; - net::IPAddress::Ptr address{}; - Thread::Ptr acceptThread{}; - - TimeWheel wheel{}; - std::set wrapperSet{}; - std::mutex wrapperSetMutex{}; - - size_t threads{2}; - std::vector eventThreadGroup{}; - DeleteContextCallback deleteContextCallback = onDeleteContext; - security::SSLContext::Ptr sslCtx{}; - void *bioMethod{}; - std::string servProtos{}; - std::string clientProtos{}; - -private: - bool activeReleaseMode = true; -}; - -} // namespace sese::_windows::iocp::v1 \ No newline at end of file diff --git a/sese/net/event/BaseEvent.h b/sese/net/event/BaseEvent.h deleted file mode 100644 index ff30be4a96..0000000000 --- a/sese/net/event/BaseEvent.h +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file BaseEvent.h - * @brief Base class for network events - * @author kaoru - */ - -#pragma once - -#define EVENT_NULL 0x0u -#define EVENT_READ 0x1u -#define EVENT_WRITE 0x2u -#define EVENT_ERROR 0x4u - -namespace sese::event { -/// Base class for network events -struct BaseEvent { - virtual ~BaseEvent() = default; - - int fd{0}; - unsigned int events{EVENT_NULL}; - void *data{nullptr}; -}; -} // namespace sese::event \ No newline at end of file diff --git a/sese/net/event/BaseEventConvert.h b/sese/net/event/BaseEventConvert.h deleted file mode 100644 index f8db360f9f..0000000000 --- a/sese/net/event/BaseEventConvert.h +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file BaseEventConvert.h - * @brief Basic network event converter interface - * @author kaoru - */ - -#pragma once - -#include "sese/net/event/BaseEvent.h" - -namespace sese::event { -/// Basic network event converter interface -class BaseEventConvert { -public: - virtual ~BaseEventConvert() = default; - - virtual unsigned int fromNativeEvent(int event) = 0; - - virtual int toNativeEvent(unsigned int event) = 0; -}; -} // namespace sese::event \ No newline at end of file diff --git a/sese/net/event/BaseEventLoop.h b/sese/net/event/BaseEventLoop.h deleted file mode 100644 index eb95aae384..0000000000 --- a/sese/net/event/BaseEventLoop.h +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file BaseEventLoop.h - * @brief Basic network event loop interface - * @author kaoru - */ - -#pragma once - -#include - -#include - -namespace sese::event { - -/// Basic network event loop interface -class BaseEventLoop { -public: - virtual bool init() = 0; - - virtual ~BaseEventLoop() = default; - - virtual void dispatch(uint32_t timeout) = 0; - - virtual void onAccept(int fd) = 0; - - virtual void onRead(BaseEvent *event) = 0; - - virtual void onWrite(BaseEvent *event) = 0; - - virtual void onError(BaseEvent *event) = 0; - - virtual void onClose(BaseEvent *event) = 0; - - virtual BaseEvent *createEvent(int fd, unsigned int events, void *data) = 0; - - virtual void freeEvent(BaseEvent *event) = 0; - - virtual bool setEvent(BaseEvent *event) = 0; - - virtual void setListenFd(int fd) = 0; - -protected: - /// Indicates whether the current loop is handling a close event - bool handleClose = true; -}; - -} // namespace sese::event \ No newline at end of file diff --git a/sese/net/event/Event.h b/sese/net/event/Event.h deleted file mode 100644 index 81c05075e8..0000000000 --- a/sese/net/event/Event.h +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file Event.h - * @brief Network event structure association - * @author kaoru - */ - -#pragma once - -#if defined(__linux__) - -#include "sese/internal/linux/net/event/EpollEvent.h" -#include "sese/internal/linux/net/event/EpollEventLoop.h" -#include "sese/internal/linux/net/event/EpollEventConvert.h" - -namespace sese::event { -using Event = BaseEvent; -using EventLoop = EpollEventLoop; -using EventConvert = EpollEventConvert; -} // namespace sese::event - -#endif - -#if defined(_WIN32) - -#include "sese/internal/win/net/event/WSAEvent.h" -#include "sese/internal/win/net/event/WSAEventLoop.h" -#include "sese/internal/win/net/event/WSAEventConvert.h" - -namespace sese::event { -using Event = BaseEvent; -using EventLoop = WSAEventLoop; -using EventConvert = WSAEventConvert; -} // namespace sese::event - -#endif - -#if defined(__APPLE__) - -#include "sese/internal/darwin/net/event/KqueueEvent.h" -#include "sese/internal/darwin/net/event/KqueueEventLoop.h" - -namespace sese::event { -using Event = BaseEvent; -using EventLoop = KqueueEventLoop; -} // namespace sese::event - -#endif \ No newline at end of file diff --git a/sese/service/BalanceLoader.h b/sese/service/BalanceLoader.h deleted file mode 100644 index e95f373289..0000000000 --- a/sese/service/BalanceLoader.h +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/// \file BalanceLoader.h -/// \author kaoru -/// \date June 9, 2023 -/// \brief Balanced Loader - -#pragma once - -#include "sese/Config.h" - -#ifdef SESE_PLATFORM_LINUX - -#include "sese/service/SystemBalanceLoader.h" - -namespace sese::service { -using BalanceLoader = SystemBalanceLoader; -using Service = sese::event::EventLoop; -} // namespace sese::service - -#else - -#include "sese/service/UserBalanceLoader.h" - -namespace sese::service { -using BalanceLoader = UserBalanceLoader; -using Service = sese::event::EventLoop; -} // namespace sese::service - -#endif \ No newline at end of file diff --git a/sese/service/SystemBalanceLoader.cpp b/sese/service/SystemBalanceLoader.cpp deleted file mode 100644 index 6caf116b1b..0000000000 --- a/sese/service/SystemBalanceLoader.cpp +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "sese/service/SystemBalanceLoader.h" - -#include - -sese::service::SystemBalanceLoader::~SystemBalanceLoader() noexcept { - if (_isStart && !_isStop) { - stop(); - } - - if (!eventLoopVector.empty()) { - for (decltype(auto) event_loop: eventLoopVector) { - delete event_loop; - } - eventLoopVector.clear(); - } - - if (!socketVector.empty()) { - for (decltype(auto) sub_socket: socketVector) { - sese::net::Socket::close(sub_socket); - } - socketVector.clear(); - } -} - -void sese::service::SystemBalanceLoader::setThreads(size_t th) noexcept { - if (!_isStart) { - threads = std::max(th, 1); - } -} - -void sese::service::SystemBalanceLoader::start() noexcept { - if (eventLoopVector.empty()) return; - - for (size_t i = 0; i < threads; ++i) { - auto thread = std::make_unique( - [this, event_loop = this->eventLoopVector[i]]() { - while (!_isStop) { - event_loop->dispatch(100); - } - }, - "SBL_" + std::to_string(i) - ); - thread->start(); - threadVector.emplace_back(std::move(thread)); - } - - _isStart = true; -} - -void sese::service::SystemBalanceLoader::stop() noexcept { - if (_isStart) { - _isStop = true; - - for (decltype(auto) th: threadVector) { - th->join(); - } - threadVector.clear(); - - for (decltype(auto) event_loop: eventLoopVector) { - delete event_loop; - } - eventLoopVector.clear(); - - for (decltype(auto) sub_socket: socketVector) { - sese::net::Socket::close(sub_socket); - } - socketVector.clear(); - } -} diff --git a/sese/service/SystemBalanceLoader.h b/sese/service/SystemBalanceLoader.h deleted file mode 100644 index c0f29bf97c..0000000000 --- a/sese/service/SystemBalanceLoader.h +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/// \file SystemBalanceLoader.h -/// \brief System Balance Loader -/// \author kaoru -/// \version 0.1 -/// \date June 3, 2023 - -#pragma once - -#include "sese/net/event/Event.h" -#include "sese/net/ReusableSocket.h" -#include "sese/thread/Thread.h" - -#include -#include -#include - -namespace sese::service { - -/// \brief System Balance Loader (non-user scheduled loader) -/// \warning This loader is effective only on Linux -/// \see sese::service::BalanceLoader -class SystemBalanceLoader final { -public: - ~SystemBalanceLoader() noexcept; - - /// Set the number of threads used by the loader - /// \param th Number of threads - void setThreads(size_t th) noexcept; - - /// Set the service startup address - /// \param addr IP Address - void setAddress(const net::IPAddress::Ptr &addr) noexcept { SystemBalanceLoader::address = addr; } - - /// Set the timeout for dispatching from threads - /// \param to Timeout in milliseconds - void setAcceptTimeout(uint32_t to) noexcept { SystemBalanceLoader::timeout = to; } - - /// Set the timeout for dispatching from threads - /// \param to Timeout in milliseconds - void setDispatchTimeout(uint32_t to) noexcept { SystemBalanceLoader::timeout = to; } - - /// Get the current loader status - /// \return The status of the loader - [[nodiscard]] bool isStarted() const { return _isStart; } - - /// Initialize loader resources - /// \tparam SERVICE The service to be started - /// \return Whether the initialization was successful - template - bool init() noexcept; - - /// Initialize balancer resources - /// \tparam SERVICE The service to be started - /// \param creator Service creation function, which returns an instance pointer if creation is successful, otherwise should return nullptr indicating failure - /// \return Whether the initialization was successful - template - bool init(std::function creator) noexcept; - - /// Start the current loader and service - void start() noexcept; - - /// Shut down the current loader and unload the service - void stop() noexcept; - -protected: - std::atomic_bool _isStart{false}; - std::atomic_bool _isStop{false}; - - uint32_t timeout = 100; - size_t threads{2}; - std::vector socketVector; - std::vector eventLoopVector; - std::vector threadVector; - sese::net::IPAddress::Ptr address = sese::net::IPv4Address::localhost(8080); -}; -} // namespace sese::service - -template -bool sese::service::SystemBalanceLoader::init() noexcept { - return sese::service::SystemBalanceLoader::init([]() -> SERVICE * { return new SERVICE; }); -} - -// GCOVR_EXCL_START - -template -bool sese::service::SystemBalanceLoader::init(std::function creator) noexcept { - if (address == nullptr) return false; - - sese::net::ReusableSocket reusable_socket(address); - for (size_t i = 0; i < threads; ++i) { - auto sub_socket = reusable_socket.makeRawSocket(); - if (sub_socket == -1) { - goto freeSocket; - } - if (0 != sese::net::Socket::setNonblocking(sub_socket)) { - goto freeSocket; - } - if (0 != sese::net::Socket::listen(sub_socket, 32)) { - goto freeSocket; - } - socketVector.emplace_back(sub_socket); - } - - for (size_t i = 0; i < threads; ++i) { - auto event = creator(); - if (event == nullptr) { - goto freeEvent; - } - event->setListenFd(static_cast(socketVector[i])); - if (!event->init()) { - delete event; - goto freeEvent; - } else { - eventLoopVector.emplace_back(event); - } - } - - return true; - -freeEvent: - for (decltype(auto) event_loop: eventLoopVector) { - delete event_loop; - } - eventLoopVector.clear(); - -freeSocket: - for (decltype(auto) sub_socket: socketVector) { - sese::net::Socket::close(sub_socket); - } - socketVector.clear(); - return false; -} - -// GCOVR_EXCL_STOP \ No newline at end of file diff --git a/sese/service/TcpTransporter.cpp b/sese/service/TcpTransporter.cpp deleted file mode 100644 index 7ddd9fbd3d..0000000000 --- a/sese/service/TcpTransporter.cpp +++ /dev/null @@ -1,254 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include - -#include - -void sese::service::TcpTransporterConfig::freeConnection(sese::service::TcpConnection *conn) { - delete conn; -} - -sese::service::TcpTransporter::TcpTransporter(sese::service::TcpTransporterConfig *transporter_config) noexcept { - this->config = transporter_config; - - if (this->config->servCtx) { - // ALPN - SSL_CTX_set_alpn_select_cb( - static_cast(config->servCtx->getContext()), - reinterpret_cast(&alpnCallbackFunction), - this - ); - } -} - -sese::service::TcpTransporter::~TcpTransporter() { - for (auto &item: eventMap) { - auto event = item.second; - auto conn = static_cast(event->data); - if (conn->timeoutEvent) { - this->freeTimeoutEvent(conn->timeoutEvent); - } - if (config->servCtx) { - SSL_free(static_cast(conn->ssl)); - } - sese::net::Socket::close(event->fd); - // delete conn; - config->freeConnection(conn); - this->freeEvent(event); - } - eventMap.clear(); -} - -void sese::service::TcpTransporter::onAccept(int fd) { - SSL *client_ssl; - // GCOVR_EXCL_START - if (sese::net::Socket::setNonblocking(fd)) { - sese::net::Socket::close(fd); - } - - // auto conn = new TcpConnection; - auto conn = config->createConnection(); - - // GCOVR_EXCL_STOP - if (config->servCtx) { - client_ssl = SSL_new(static_cast(config->servCtx->getContext())); - SSL_set_fd(client_ssl, (int) fd); - SSL_set_accept_state(client_ssl); - - // GCOVR_EXCL_START - while (true) { - auto rt = SSL_do_handshake(client_ssl); - if (rt <= 0) { - auto err = SSL_get_error(client_ssl, rt); - if (err != SSL_ERROR_WANT_READ && err != SSL_ERROR_WANT_WRITE) { - SSL_free(client_ssl); - sese::net::Socket::close(fd); - // delete conn; - config->freeConnection(conn); - return; - } - } else { - conn->ssl = client_ssl; - // This option allows OpenSSL to use different buffer parameters when trying to retry SSL_write - // https://stackoverflow.com/questions/2997218/why-am-i-getting-error1409f07fssl-routinesssl3-write-pending-bad-write-retr - SSL_set_mode(client_ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); - // ALPN Callback - const uint8_t *data = nullptr; - uint32_t data_length; - SSL_get0_alpn_selected(client_ssl, &data, &data_length); - onProcAlpnGet(conn, data, data_length); - break; - } - } - // GCOVR_EXCL_STOP - } - - if (config->keepalive) { - conn->timeoutEvent = createTimeoutEvent(fd, conn, config->keepalive); - } - conn->event = createEventEx(fd, EVENT_READ, conn); -} - -void sese::service::TcpTransporter::onRead(sese::event::BaseEvent *event) { - auto conn = static_cast(event->data); - - if (config->keepalive && conn->timeoutEvent) { - cancelTimeoutEvent(conn->timeoutEvent); - } - - char buf[MTU_VALUE]; - while (true) { - auto l = read(event->fd, buf, MTU_VALUE, conn->ssl); - if (l <= 0) { - auto err = sese::net::getNetworkError(); - if (err == ENOTCONN) { - if (conn->timeoutEvent) { - TimerableService::freeTimeoutEvent(conn->timeoutEvent); - } - onProcClose(conn); - if (config->servCtx) { - SSL_free(static_cast(conn->ssl)); - } - sese::net::Socket::close(event->fd); - // delete conn; - config->freeConnection(conn); - this->freeEventEx(event); - break; - } else { - onProcHandle(conn); - break; - } - } else { - conn->buffer2read.write(buf, l); - } - } -} - -void sese::service::TcpTransporter::onWrite(sese::event::BaseEvent *event) { - auto conn = static_cast(event->data); - char buf[MTU_VALUE]; - while (true) { - auto len = conn->buffer2write.peek(buf, MTU_VALUE); - if (len == 0) { - conn->buffer2write.freeCapacity(); - break; - } - auto l = write(event->fd, buf, len, conn->ssl); - if (l <= 0) { - auto err = sese::net::getNetworkError(); - if (err == EWOULDBLOCK || err == EINTR) { - conn->event->events &= ~EVENT_READ; - conn->event->events |= EVENT_WRITE; - this->setEvent(event); - break; - } else { - if (conn->timeoutEvent) { - TimerableService::freeTimeoutEvent(conn->timeoutEvent); - } - onProcClose(conn); - if (config->servCtx) { - SSL_free(static_cast(conn->ssl)); - } - sese::net::Socket::close(event->fd); - // delete conn; - config->freeConnection(conn); - this->freeEventEx(event); - break; - } - } else { - conn->buffer2write.trunc(l); - } - } - - conn->event->events |= EVENT_READ; - conn->event->events &= ~EVENT_WRITE; - this->setEvent(event); -} - -void sese::service::TcpTransporter::onClose(sese::event::BaseEvent *event) { - auto conn = static_cast(event->data); - /// \brief If a connection is processed asynchronously, - /// the peer closure event should not release resources for the connection - /// \see tcp_connection_delay_close_by_async - if (!conn->isAsync) { - if (conn->timeoutEvent) { - TimerableService::freeTimeoutEvent(conn->timeoutEvent); - } - onProcClose(conn); - if (config->servCtx) { - SSL_free(static_cast(conn->ssl)); - } - sese::net::Socket::close(event->fd); - // delete conn; - config->freeConnection(conn); - freeEventEx(event); - } -} - -sese::event::BaseEvent *sese::service::TcpTransporter::createEventEx(int fd, unsigned int events, sese::service::TcpConnection *conn) noexcept { - auto event = createEvent(fd, events, conn); - eventMap[fd] = event; - return event; -} - -void sese::service::TcpTransporter::freeEventEx(event::BaseEvent *event) noexcept { - eventMap.erase(event->fd); - freeEvent(event); -} - -int64_t sese::service::TcpTransporter::read(int fd, void *buffer, size_t len, void *ssl) noexcept { - if (ssl) { - return SSL_read(static_cast(ssl), buffer, static_cast(len)); - } else { - return sese::net::Socket::read(fd, buffer, len, 0); - } -} - -int64_t sese::service::TcpTransporter::write(int fd, const void *buffer, size_t len, void *ssl) noexcept { - if (ssl) { - return SSL_write(static_cast(ssl), buffer, static_cast(len)); - } else { - return sese::net::Socket::write(fd, buffer, len, 0); - } -} - -void sese::service::TcpTransporter::onTimeout(sese::service::v1::TimeoutEvent *timeout_event) { - auto conn = static_cast(timeout_event->data); - onProcClose(conn); - if (config->servCtx) { - SSL_free(static_cast(conn->ssl)); - } - sese::net::Socket::close(conn->event->fd); - this->freeEventEx(conn->event); - // delete conn; - config->freeConnection(conn); -} - -void sese::service::TcpTransporter::postRead(TcpConnection *conn) { - conn->event->events &= ~EVENT_WRITE; - conn->event->events |= EVENT_READ; - setEvent(conn->event); -} - -void sese::service::TcpTransporter::postWrite(TcpConnection *conn) { - conn->event->events |= EVENT_WRITE; - conn->event->events &= ~EVENT_READ; - onWrite(conn->event); -} - -int sese::service::TcpTransporter::alpnCallbackFunction([[maybe_unused]] void *ssl, const uint8_t **out, uint8_t *out_length, const uint8_t *in, uint32_t in_length, sese::service::TcpTransporter *transporter) { - return transporter->onProcAlpnSelect(out, out_length, in, in_length); -} \ No newline at end of file diff --git a/sese/service/TcpTransporter.h b/sese/service/TcpTransporter.h deleted file mode 100644 index 6a7997f80f..0000000000 --- a/sese/service/TcpTransporter.h +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/// \file TcpTransporter.h -/// \brief TCP Transporter -/// \author kaoru -/// \version - -#pragma once - -#include -#include -#include "sese/io/ByteBuilder.h" - -#include - -#ifdef _WIN32 -#pragma warning(disable : 4275) -#endif - -namespace sese::service { - -/// TCP Connection -struct TcpConnection { - virtual ~TcpConnection() = default; - - /// \anchor tcp_connection_delay_close_by_async - /// \brief - /// Set this variable to indicate that this connection is being asynchronously processed. - /// When an onClose event occurs, resource release operations will not be performed. - /// The release operation will be delayed at least until the disconnection is detected - /// during a write or read event (onWrite, onRead). - /// After asynchronous operations are completed, this flag should be set back to false. - std::atomic_bool isAsync = false; - - void *ssl = nullptr; - event::BaseEvent *event = nullptr; - service::v1::TimeoutEvent *timeoutEvent = nullptr; - io::ByteBuilder buffer2read{8192}; - io::ByteBuilder buffer2write{8192}; -}; - -/// TCP transporter configuration -struct TcpTransporterConfig { - uint32_t keepalive = 30; - security::SSLContext::Ptr servCtx = nullptr; - - virtual ~TcpTransporterConfig() = default; - virtual TcpConnection *createConnection() = 0; - virtual void freeConnection(TcpConnection *conn); -}; - -/// TCP transporter -class TcpTransporter : public v1::TimerableService { -public: - explicit TcpTransporter(TcpTransporterConfig *transporter_config) noexcept; - ~TcpTransporter() override; - -protected: - void onAccept(int fd) override; - void onRead(event::BaseEvent *event) override; - void onWrite(event::BaseEvent *event) override; - void onClose(event::BaseEvent *event) override; - void onTimeout(v1::TimeoutEvent *timeout_event) override; - -protected: - virtual void postRead(TcpConnection *conn); - virtual void postWrite(TcpConnection *conn); - virtual int onProcAlpnSelect( - const uint8_t **out, uint8_t *out_length, - const uint8_t *in, uint32_t in_length - ) = 0; - virtual void onProcAlpnGet( - TcpConnection *conn, - const uint8_t *in, uint32_t in_length - ) = 0; - virtual void onProcHandle(TcpConnection *conn) = 0; - virtual void onProcClose(TcpConnection *conn) = 0; - -protected: - event::BaseEvent *createEventEx(int fd, unsigned int events, TcpConnection *conn) noexcept; - void freeEventEx(event::BaseEvent *event) noexcept; - static int64_t read(int fd, void *buffer, size_t len, void *ssl) noexcept; - static int64_t write(int fd, const void *buffer, size_t len, void *ssl) noexcept; - - TcpTransporterConfig *config = nullptr; - std::map eventMap; - -private: - static int alpnCallbackFunction( - void *ssl, - const uint8_t **out, uint8_t *out_length, - const uint8_t *in, uint32_t in_length, - TcpTransporter *transporter - ); -}; - -} // namespace sese::service \ No newline at end of file diff --git a/sese/service/TimerableService.h b/sese/service/TimerableService.h deleted file mode 100644 index 4430e085f5..0000000000 --- a/sese/service/TimerableService.h +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/// \file TimerableService.h -/// \author kaoru -/// \date September 16, 2023 -/// \brief Timerable Service -/// \version 0.1.0 - -#pragma once - -#include -#include - -namespace sese::service { -/// Timeout event struct -typedef v2::TimeoutEvent TimeoutEvent; - -/// Timerable Service -typedef v2::TimerableService TimerableService; -} // namespace sese::service \ No newline at end of file diff --git a/sese/service/TimerableService_V1.cpp b/sese/service/TimerableService_V1.cpp deleted file mode 100644 index 016367f830..0000000000 --- a/sese/service/TimerableService_V1.cpp +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "sese/service/TimerableService_V1.h" - -#include -#include - -using sese::service::v1::TimerableService; -using sese::service::v1::TimeoutEvent; - -TimerableService::~TimerableService() { - if (!timeoutMap.empty()) { - std::for_each(timeoutMap.begin(), timeoutMap.end(), [](std::pair pair) -> void { - delete pair.second; - }); - timeoutMap.clear(); - } -} - -void TimerableService::onTimeout(TimeoutEvent *timeout_event) { -} - -TimeoutEvent *TimerableService::createTimeoutEvent(int fd, void *data, uint64_t seconds) { - auto event = new TimeoutEvent; - timeoutMap[fd] = event; - - event->fd = fd; - event->data = data; - event->exceptTimestamp = std::chrono::time_point_cast(std::chrono::system_clock::now()).time_since_epoch().count() + seconds; - - auto index = (event->exceptTimestamp - startTimestamp) % 60; - // SESE_INFO("create event at %d", (int) index); - auto &table = timeoutTable[index]; - table.emplace_back(event); - - return event; -} - -void TimerableService::setTimeoutEvent(TimeoutEvent *timeout_event, uint64_t seconds) { - // If there is a pre-existing event, cancel it first - { - auto index = (timeout_event->exceptTimestamp - startTimestamp) % 60; - // SESE_INFO("cancel event at %d", (int) index); - auto &table = timeoutTable[index]; - table.remove(timeout_event); - } - // auto iterator = std::find_if(table.begin(), table.end(), [&](TimeoutEvent *event) -> bool { - // return timeoutEvent->fd == event->fd; - // }); - // if (iterator != table.end()) { - // table.erase(iterator); - // } - - // Set up a new event - { - - timeout_event->exceptTimestamp = std::chrono::time_point_cast(std::chrono::system_clock::now()).time_since_epoch().count() + seconds; - auto index = (timeout_event->exceptTimestamp - startTimestamp) % 60; - auto &table = timeoutTable[index]; - // SESE_INFO("new event at %d", (int) index); - table.emplace_back(timeout_event); - } -} - -TimeoutEvent *TimerableService::getTimeoutEventByFd(int fd) { - auto iterator = timeoutMap.find(fd); - if (iterator == timeoutMap.end()) { - return nullptr; - } else { - return iterator->second; - } -} - -void TimerableService::cancelTimeoutEvent(TimeoutEvent *timeout_event) { - // If there is a pre-existing event, cancel it first - auto index = (timeout_event->exceptTimestamp - startTimestamp) % 60; - // SESE_INFO("cancel event at %d", (int) index); - auto &table = timeoutTable[index]; - - table.remove(timeout_event); - // auto iterator = std::find_if(table.begin(), table.end(), [&](TimeoutEvent *event) -> bool { - // return timeoutEvent->fd == event->fd; - // }); - // if (iterator != table.end()) { - // table.erase(iterator); - // } -} - -void TimerableService::freeTimeoutEvent(TimeoutEvent *timeout_event) { - auto index = (timeout_event->exceptTimestamp - startTimestamp) % 60; - auto &table = timeoutTable[index]; - // SESE_INFO("free %d at %d", timeoutEvent->fd, (int) index); - table.remove(timeout_event); - - auto iterator = timeoutMap.find(timeout_event->fd); - if (iterator != timeoutMap.end()) { - delete iterator->second; - timeoutMap.erase(iterator); - } -} - -void TimerableService::dispatch(uint32_t timeout) { - auto now = static_cast(std::chrono::time_point_cast(std::chrono::system_clock::now()).time_since_epoch().count()); - auto index = (now - startTimestamp) % 60; - auto &table = timeoutTable[index]; - - for (auto iterator = table.begin(); iterator != table.end();) { - if ((*iterator)->exceptTimestamp <= now) { - onTimeout(*iterator); - timeoutMap.erase((*iterator)->fd); - delete (*iterator); - table.erase(iterator++); - } else { - iterator++; - } - } - - event::EventLoop::dispatch(timeout); -} - -bool TimerableService::init() { - startTimestamp = std::chrono::time_point_cast(std::chrono::system_clock::now()).time_since_epoch().count(); - return event::EventLoop::init(); -} diff --git a/sese/service/TimerableService_V1.h b/sese/service/TimerableService_V1.h deleted file mode 100644 index e052e7f79d..0000000000 --- a/sese/service/TimerableService_V1.h +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/// \file TimerableService_V1.h -/// \author kaoru -/// \date June 9, 2023 -/// \brief Timerable Service -/// \version 0.1.0 - -#pragma once - -#include "sese/net/event/Event.h" - -#include -#include - -namespace sese::service::v1 { - -/// Timeout event structure -struct TimeoutEvent { - /// File descriptor - int fd{0}; - /// Expected timestamp - uint64_t exceptTimestamp{0}; - /// Extra data - void *data{nullptr}; -}; - -/// Timerable Service -class TimerableService : public event::EventLoop { -public: - ~TimerableService() override; - - bool init() override; - - void dispatch(uint32_t timeout) override; - -public: - /// Triggered when a timeout event occurs - /// \param timeout_event Timeout event - virtual void onTimeout(TimeoutEvent *timeout_event); - - /// Create a timeout event - /// \note Each file descriptor can correspond to only one timeout event.

- /// Repeating the file descriptor will cause the original timeout event to be overwritten.

- /// Typically, file descriptors refer to socket file descriptors.

- /// For custom timeout events unrelated to sockets,

- /// the file descriptor can be set to a negative number less than -1.

- /// The timeout service does not care about the sign of the file descriptor. - /// \param fd File descriptor - /// \param data Extra data - /// \param seconds Timeout in seconds - /// \return Timeout event structure - TimeoutEvent *createTimeoutEvent(int fd, void *data, uint64_t seconds); - - /// Reset the current timeout event, the original event will be canceled and overwritten - /// \see createTimoutEvent - /// \param timeout_event Timeout event structure - /// \param seconds Timeout in seconds - void setTimeoutEvent(TimeoutEvent *timeout_event, uint64_t seconds); - - /// Get the timeout event structure by file descriptor - /// \param fd File descriptor - /// \retval nullptr The corresponding timeout event does not exist - /// \return The corresponding timeout event structure - TimeoutEvent *getTimeoutEventByFd(int fd); - - /// Cancel the current timeout event - /// \param timeout_event Timeout event structure - void cancelTimeoutEvent(TimeoutEvent *timeout_event); - - /// Release the current timeout event structure - /// \param timeout_event Timeout event structure - void freeTimeoutEvent(TimeoutEvent *timeout_event); - -private: - uint64_t startTimestamp{0}; - - std::map timeoutMap; - std::list timeoutTable[60]{}; -}; - -} // namespace sese::service \ No newline at end of file diff --git a/sese/service/TimerableService_V2.cpp b/sese/service/TimerableService_V2.cpp deleted file mode 100644 index 9f543907ef..0000000000 --- a/sese/service/TimerableService_V2.cpp +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -using namespace sese::event; -using namespace sese::service::v2; - -void TimerableService::dispatch(uint32_t timeout) { - timeWheel.check(); - EventLoop::dispatch(timeout); -} - -void TimerableService::onTimeout(v2::TimeoutEvent *event) { -} - -TimeoutEvent *TimerableService::setTimeoutEvent(int64_t seconds, void *data) { - auto event = new v2::TimeoutEvent; - event->event = timeWheel.delay( - [this, event]() { - this->onTimeout(event); - delete event; - }, - seconds, - false - ); - event->data = data; - return event; -} - -void TimerableService::cancelTimeoutEvent(v2::TimeoutEvent *event) { - timeWheel.cancel(event->event); - delete event; -} \ No newline at end of file diff --git a/sese/service/TimerableService_V2.h b/sese/service/TimerableService_V2.h deleted file mode 100644 index a273161b68..0000000000 --- a/sese/service/TimerableService_V2.h +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/// \file TimerableService_V2.h -/// \author kaoru -/// \date September 19, 2023 -/// \brief Timerable Service -/// \version 0.2.0 - -#pragma once - -#include -#include - -#if defined(_MSC_VER) -#pragma warning(disable : 4275) -#endif - -namespace sese::service::v2 { - -/// Timeout event structure -struct TimeoutEvent { - sese::TimeoutEvent *event{nullptr}; - /// Additional data - void *data{nullptr}; -}; - -/// Timerable Service -class TimerableService : public event::EventLoop { -public: - /// Dispatch events - /// \param timeout Timeout duration - void dispatch(uint32_t timeout) override; - - /// Timeout callback function - /// \param event Event - virtual void onTimeout(v2::TimeoutEvent *event); - - /// Set a timeout event - /// \param seconds Timeout duration - /// \param data Additional data - /// \return Timeout event structure - v2::TimeoutEvent *setTimeoutEvent(int64_t seconds, void *data); - - /// Cancel and free the timeout event - /// \param event Timeout event - void cancelTimeoutEvent(v2::TimeoutEvent *event); - -private: - /// Time wheel - TimeWheel timeWheel{}; -}; - -} // namespace sese::service diff --git a/sese/service/UserBalanceLoader.cpp b/sese/service/UserBalanceLoader.cpp deleted file mode 100644 index a79de047dd..0000000000 --- a/sese/service/UserBalanceLoader.cpp +++ /dev/null @@ -1,195 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "sese/service/UserBalanceLoader.h" - -#include - -sese::service::UserBalanceLoader::~UserBalanceLoader() noexcept { - if (_isStart && !_isStop) { - stop(); - } - - if (masterSocketQueueArray) { - delete[] masterSocketQueueArray; - masterSocketQueueArray = nullptr; - } - - if (slaveSocketQueueArray) { - delete[] slaveSocketQueueArray; - slaveSocketQueueArray = nullptr; - } - - if (!eventLoopVector.empty()) { - for (decltype(auto) event_loop: eventLoopVector) { - delete event_loop; - } - eventLoopVector.clear(); - } - - if (masterEventLoop) { - delete masterEventLoop; - masterEventLoop = nullptr; - } - - if (socket) { - socket->close(); - delete socket; - socket = nullptr; - } -} - -void sese::service::UserBalanceLoader::setThreads(size_t th) noexcept { - if (!_isStart) { - threads = std::max(th, 1); - } -} - -void sese::service::UserBalanceLoader::start() noexcept { - if (eventLoopVector.empty()) return; - - masterThread = std::make_unique(std::bind(&UserBalanceLoader::master, this), "UBL_0"); // NOLINT - masterThread->start(); - - for (size_t i = 0; i < threads; ++i) { - auto thread = std::make_unique( - std::bind( // NOLINT - &UserBalanceLoader::slave, - this, eventLoopVector[i], - &masterSocketQueueArray[i], - &slaveSocketQueueArray[i], - &mutexArray[i] - ), - "UBL_" + std::to_string(i + 1) - ); - thread->start(); - threadVector.emplace_back(std::move(thread)); - } - - _isStart = true; -} - -void sese::service::UserBalanceLoader::stop() noexcept { - if (_isStart) { - _isStop = true; - - masterThread->join(); - masterThread = nullptr; - - for (decltype(auto) th: threadVector) { - th->join(); - } - threadVector.clear(); - - delete[] mutexArray; - mutexArray = nullptr; - - for (decltype(auto) event_loop: eventLoopVector) { - delete event_loop; - } - eventLoopVector.clear(); - - for (int i = 0; i < threads; ++i) { - decltype(auto) master_queue = masterSocketQueueArray[i]; - decltype(auto) slave_queue = slaveSocketQueueArray[i]; - while (!master_queue.empty()) { - const auto CLIENT = master_queue.front(); - master_queue.pop(); - sese::net::Socket::close(CLIENT.fd); - } - while (!slave_queue.empty()) { - const auto CLIENT = slave_queue.front(); - slave_queue.pop(); - sese::net::Socket::close(CLIENT.fd); - } - } - - delete[] masterSocketQueueArray; - delete[] slaveSocketQueueArray; - this->masterSocketQueueArray = nullptr; - this->slaveSocketQueueArray = nullptr; - - socket->close(); - delete socket; - socket = nullptr; - } -} - -void sese::service::UserBalanceLoader::dispatchSocket(sese::socket_t sock, void *data) { - auto index = sock % threads; - mutexArray[index].lock(); - masterSocketQueueArray[index].push({sock, data}); - mutexArray[index].unlock(); -} - -void sese::service::UserBalanceLoader::master() noexcept { - while (!_isStop) { - masterEventLoop->dispatch(acceptTimeout); - -#ifdef WIN32 - socket_t last = 0; -#endif - while (!masterEventLoop->socketQueue.empty()) { - auto fd = masterEventLoop->socketQueue.front(); - masterEventLoop->socketQueue.pop(); -#ifdef WIN32 - // Windows Socket values are generally reused and are multiples of 4 - int factor = 7; - if (last == fd) { - factor = 3; - } - auto index = (fd / factor) % threads; - last = fd; -#else - auto index = fd % threads; -#endif - - // size_t index; - // for (size_t i = 0; i < threads; ++i) { - // index = std::min(masterSocketQueueArray[i].size(), index); - // } - - mutexArray[index].lock(); - masterSocketQueueArray[index].push({fd, nullptr}); - mutexArray[index].unlock(); - } - } -} - -void sese::service::UserBalanceLoader::slave(sese::event::EventLoop *event_loop, std::queue *master_queue, std::queue *slave_queue, std::mutex *mutex) noexcept { - while (!_isStop) { - mutex->lock(); - if (!master_queue->empty()) { - master_queue->swap(*slave_queue); - mutex->unlock(); - while (!slave_queue->empty()) { - auto client = slave_queue->front(); - slave_queue->pop(); - if (client.data == nullptr) { - // This pretends to be an access client provided by one of the implementations - event_loop->onAccept(static_cast(client.fd)); - } else if (onDispatchedCallbackFunction) { - onDispatchedCallbackFunction(static_cast(client.fd), event_loop, client.data); - } - } - } else { - mutex->unlock(); - } - event_loop->dispatch(dispatchTimeout); - } -} - -void sese::service::MasterEventLoop::onAccept(int fd) { - this->socketQueue.emplace(fd); -} \ No newline at end of file diff --git a/sese/service/UserBalanceLoader.h b/sese/service/UserBalanceLoader.h deleted file mode 100644 index 16192dc608..0000000000 --- a/sese/service/UserBalanceLoader.h +++ /dev/null @@ -1,206 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/// \file UserBalanceLoader.h -/// \brief User Load Balancer -/// \author kaoru -/// \version 0.1 -/// \date June 5, 2023 - -#pragma once - -#include "sese/net/event/Event.h" -#include "sese/net/Socket.h" -#include "sese/thread/Thread.h" - -#include -#include -#include - -namespace sese::service { - -class MasterEventLoop; - -/// User load balancer, applicable to all platforms -class UserBalanceLoader { -public: - ~UserBalanceLoader() noexcept; - - /// Set the number of threads used by the loader - /// \param th Number of threads - void setThreads(size_t th) noexcept; - - /// Set the service startup address - /// \param addr IP Address - void setAddress(const net::IPAddress::Ptr &addr) noexcept { UserBalanceLoader::address = addr; } // GCOVR_EXCL_LINE - - /// Set the timeout for the main listening thread - /// \param to Timeout in milliseconds - void setAcceptTimeout(uint32_t to) noexcept { UserBalanceLoader::acceptTimeout = to; } - - /// Set the dispatch timeout for worker threads - /// \param to Timeout in milliseconds - void setDispatchTimeout(uint32_t to) noexcept { UserBalanceLoader::dispatchTimeout = to; } - - /// Get the current loader status - /// \return Loader status - [[nodiscard]] bool isStarted() const { return _isStart; } // GCOVR_EXCL_LINE - - /// Initialize loader resources - /// \tparam SERVICE Service to be started - /// \return Whether initialization is successful - template - bool init() noexcept; - - /// Initialize balancer resources - /// \tparam SERVICE Service to be started - /// \param creator Service creation function, returns an instance pointer on success, otherwise should return nullptr - /// \return Whether initialization is successful - template - bool init(std::function creator) noexcept; - - /// Start the current loader and service - void start() noexcept; - - /// Stop the current loader and unload services - void stop() noexcept; - - void dispatchSocket(socket_t sock, void *data); - - void setOnDispatchedCallbackFunction(const std::function &cb) { UserBalanceLoader::onDispatchedCallbackFunction = cb; } - -protected: - /// Socket status - struct SocketStatus { - socket_t fd{}; - void *data{}; - }; - - void master() noexcept; - - void slave( - sese::event::EventLoop *event_loop, - std::queue *master_queue, - std::queue *slave_queue, - std::mutex *mutex - ) noexcept; - -protected: - std::atomic_bool _isStart{false}; - std::atomic_bool _isStop{false}; - - uint32_t acceptTimeout = 100; - uint32_t dispatchTimeout = 100; - size_t threads{2}; - std::vector eventLoopVector; - std::vector threadVector; - sese::net::IPAddress::Ptr address = sese::net::IPv4Address::localhost(8080); - - sese::net::Socket *socket{nullptr}; - sese::service::MasterEventLoop *masterEventLoop{nullptr}; - sese::Thread::Ptr masterThread{nullptr}; - /// socket_t exchange queues - std::queue *masterSocketQueueArray{nullptr}; - std::queue *slaveSocketQueueArray{nullptr}; - std::mutex *mutexArray{nullptr}; - - std::function onDispatchedCallbackFunction; -}; - -/// User load balancer main thread -class MasterEventLoop final : public sese::event::EventLoop { -public: - void onAccept(int fd) override; - - std::queue socketQueue; -}; -} // namespace sese::service - -// The test code here is not convenient to simulate -// GCOVR_EXCL_START - -template -bool sese::service::UserBalanceLoader::init() noexcept { - return sese::service::UserBalanceLoader::init([]() -> SERVICE * { return new SERVICE; }); -} - -template -bool sese::service::UserBalanceLoader::init(std::function creator) noexcept { - if (address == nullptr) return false; - - socket = new net::Socket( - address->getRawAddress()->sa_family == AF_INET ? net::Socket::Family::IPv4 : net::Socket::Family::IPv6, - net::Socket::Type::TCP - ); - if (-1 == socket->getRawSocket()) { - return false; - } - - if (!socket->setNonblocking()) { - goto freeSocket; - } - - if (0 != socket->bind(address)) { - goto freeSocket; - } - - if (0 != socket->listen(32)) { - goto freeSocket; - } - - masterEventLoop = new MasterEventLoop; - masterEventLoop->setListenFd(static_cast(socket->getRawSocket())); - if (!masterEventLoop->init()) { - goto freeMaster; - } - - for (size_t i = 0; i < threads; ++i) { - auto event = creator(); - if (event == nullptr) { - goto freeEvent; - } - if (!event->init()) { - delete event; - goto freeEvent; - } else { - eventLoopVector.emplace_back(event); - } - } - - // Initialize exchange queues - masterSocketQueueArray = new std::queue[threads]; - slaveSocketQueueArray = new std::queue[threads]; - mutexArray = new std::mutex[threads]; - - return true; - -freeEvent: - for (decltype(auto) event_loop: eventLoopVector) { - delete event_loop; - } - eventLoopVector.clear(); - -freeMaster: - delete masterEventLoop; - masterEventLoop = nullptr; - -freeSocket: - socket->close(); - delete socket; - socket = nullptr; - return false; -} - - -// GCOVR_EXCL_STOP \ No newline at end of file diff --git a/sese/service/iocp/IOCPServer.h b/sese/service/iocp/IOCPServer.h deleted file mode 100644 index 25ad2389b4..0000000000 --- a/sese/service/iocp/IOCPServer.h +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file IOCPServer.h - * @brief IOCP Server - * @author kaoru - */ - -#pragma once - -#include - -#if defined(SESE_PLATFORM_WINDOWS) -#include -#else -#include -#endif - -namespace sese::iocp { -#if defined(SESE_PLATFORM_WINDOWS) -typedef _windows::iocp::v1::NativeContext Context; -typedef _windows::iocp::v1::NativeIOCPServer IOCPServer; -namespace v1 { - typedef _windows::iocp::v1::NativeContext Context; - typedef _windows::iocp::v1::NativeIOCPServer IOCPServer; -} -#else -typedef v1::Context Context; -typedef v1::IOCPServer IOCPServer; -#endif -} // namespace sese::iocp \ No newline at end of file diff --git a/sese/service/iocp/IOCPServer_V1.cpp b/sese/service/iocp/IOCPServer_V1.cpp deleted file mode 100644 index b075ab1fc7..0000000000 --- a/sese/service/iocp/IOCPServer_V1.cpp +++ /dev/null @@ -1,406 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -#include - -using namespace sese::iocp::v1; -using namespace sese::service; - -#pragma region Context - -int64_t Context::read(void *buffer, size_t length) { - return recv.read(buffer, length); -} - -int64_t Context::write(const void *buffer, size_t length) { - return send.write(buffer, length); -} - -int64_t Context::peek(void *buffer, size_t length) { - return recv.peek(buffer, length); -} - -int64_t Context::trunc(size_t length) { - return recv.trunc(length); -} - -#pragma endregion Context - -#pragma region Service - -IOCPService::IOCPService(IOCPServer *master, bool active_release_mode) - : TimerableService() { - IOCPService::master = master; - IOCPService::handleClose = active_release_mode; - - if (master->getServCtx()) { - const auto SERVER_SSL = static_cast(master->getServCtx()->getContext()); - SSL_CTX_set_alpn_select_cb( - SERVER_SSL, - reinterpret_cast(&alpnCallbackFunction), - this - ); - } -} - -IOCPService::~IOCPService() { - for (auto &&item: eventSet) { - const auto CTX = static_cast(item->data); - if (CTX->ssl) { - // std::printf("ssl free %p\n", ctx->ssl); - SSL_free(static_cast(CTX->ssl)); - CTX->ssl = nullptr; - } - if (CTX->timeoutEvent) { - cancelTimeoutEvent(CTX->timeoutEvent); - } - sese::net::Socket::close(CTX->fd); - event::EventLoop::freeEvent(item); - CTX->self->getDeleteContextCallback()(CTX); - delete CTX; - } -} - -void IOCPService::postRead(Context *ctx) { - if (ctx->event) { - ctx->event->events &= ~EVENT_WRITE; - ctx->event->events |= EVENT_READ; - IOCPService::setEvent(ctx->event); - } else { - ctx->event = createEventEx(static_cast(ctx->fd), EVENT_READ, ctx); - ctx->event->data = ctx; - } -} - -void IOCPService::postWrite(Context *ctx) { - if (ctx->event) { - ctx->event->events &= ~EVENT_READ; - ctx->event->events |= EVENT_WRITE; - IOCPService::setEvent(ctx->event); - } else { - ctx->event = createEventEx(static_cast(ctx->fd), EVENT_WRITE, ctx); - ctx->event->data = ctx; - } -} - -void IOCPService::postClose(Context *ctx) { - if (handleClose) { - using namespace sese::net; - Socket::shutdown(ctx->fd, Socket::ShutdownMode::BOTH); - } else { - releaseContext(ctx); - } -} - -void IOCPService::onAcceptCompleted(Context *ctx) { - ctx->self->onAcceptCompleted(ctx); -} - -void IOCPService::onPreRead(Context *ctx) { - ctx->self->onPreRead(ctx); -} - -void IOCPService::onReadCompleted(Context *ctx) { - ctx->self->onReadCompleted(ctx); -} - -void IOCPService::onWriteCompleted(Context *ctx) { - ctx->self->onWriteCompleted(ctx); -} - -void IOCPService::onTimeout(Context *ctx) { - ctx->timeoutEvent = nullptr; - ctx->self->onTimeout(ctx); -} - -void IOCPService::onConnected(Context *ctx) { - ctx->self->onConnected(ctx); -} - -void IOCPService::onAlpnGet(Context *ctx, const uint8_t *in, uint32_t in_length) { - ctx->self->onAlpnGet(ctx, in, in_length); -} - -void IOCPService::onAccept(int fd) { - if (sese::net::Socket::setNonblocking(fd)) { - sese::net::Socket::close(fd); - } - - const auto CTX = new Context; - CTX->self = master; - CTX->client = this; - CTX->fd = fd; - - if (master->getServCtx()) { - const auto SERVER_SSL = static_cast(master->getServCtx()->getContext()); - SSL *client_ssl = SSL_new(SERVER_SSL); - SSL_set_fd(client_ssl, fd); - SSL_set_accept_state(client_ssl); - - while (true) { - if (const auto RT = SSL_do_handshake(client_ssl); RT <= 0) { - if (const auto ERR = SSL_get_error(client_ssl, RT); ERR != SSL_ERROR_WANT_READ && ERR != SSL_ERROR_WANT_WRITE) { - SSL_free(client_ssl); - sese::net::Socket::close(fd); - delete CTX; - return; - } - } else { - CTX->ssl = client_ssl; - SSL_set_mode(client_ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); - const uint8_t *data = nullptr; - uint32_t data_length; - SSL_get0_alpn_selected(client_ssl, &data, &data_length); - onAlpnGet(CTX, data, data_length); - break; - } - } - } - - onAcceptCompleted(CTX); -} - -void IOCPService::onRead(sese::event::BaseEvent *event) { - const auto CTX = static_cast(event->data); - - onPreRead(CTX); - - size_t len = 0; - while (true) { - char buf[MTU_VALUE]; - if (const auto L = read(static_cast(CTX->fd), buf, MTU_VALUE, CTX->ssl); L <= 0) { - if (L == 0 && len == 0) { - releaseContext(CTX); - break; - } else { - onReadCompleted(CTX); - break; - } - } else { - CTX->recv.write(buf, L); - len += static_cast(L); - } - } -} - -void IOCPService::onWrite(sese::event::BaseEvent *event) { - if (const auto CTX = static_cast(event->data); CTX->isConn) { - CTX->isConn = false; - auto ssl = static_cast(CTX->ssl); - if (CTX->ssl) { - SSL_set_connect_state(ssl); - // GCOVR_EXCL_START - while (true) { - if (const auto RT = SSL_do_handshake(ssl); RT <= 0) { - // err is SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE - if (const auto ERR = SSL_get_error(ssl, RT); ERR != SSL_ERROR_WANT_READ && ERR != SSL_ERROR_WANT_WRITE) { - SSL_free(ssl); - CTX->ssl = nullptr; - ssl = nullptr; - break; - } - } else { - break; - } - } - // GCOVR_EXCL_STOP - if (ssl == nullptr) { - releaseContext(CTX); - return; - } else { - SSL_set_mode(ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); - const uint8_t *data = nullptr; - uint32_t data_length; - SSL_get0_alpn_selected(ssl, &data, &data_length); - onAlpnGet(CTX, data, data_length); - } - } - onConnected(CTX); - } else { - while (true) { - char buf[MTU_VALUE]; - const auto LEN = CTX->send.peek(buf, MTU_VALUE); - if (LEN == 0) { - CTX->send.freeCapacity(); - onWriteCompleted(CTX); - break; - } - if (const auto L = write(static_cast(CTX->fd), buf, LEN, CTX->ssl); L <= 0) { - if (const auto ERR = sese::net::getNetworkError(); ERR == EWOULDBLOCK || ERR == EINTR) { - postWrite(CTX); - break; - } else { - releaseContext(CTX); - break; - } - } else { - CTX->send.trunc(L); - } - } - } -} - -void IOCPService::onClose(sese::event::BaseEvent *event) { - const auto CTX = static_cast(event->data); - releaseContext(CTX); -} - -void IOCPService::onTimeout(v2::TimeoutEvent *event) { - const auto CTX = static_cast(event->data); - IOCPService::onTimeout(CTX); -} - -int IOCPService::alpnCallbackFunction([[maybe_unused]] void *ssl, const uint8_t **out, uint8_t *out_length, const uint8_t *in, uint32_t in_length, IOCPService *service) { - return service->master->onAlpnSelect(out, out_length, in, in_length); -} - -sese::event::BaseEvent *IOCPService::createEventEx(int fd, unsigned int events, void *data) { - auto event = createEvent(fd, events, data); - eventSet.emplace(event); - return event; -} - -void IOCPService::freeEventEx(sese::event::BaseEvent *event) { - eventSet.erase(event); - freeEvent(event); -} - -void IOCPService::releaseContext(Context *ctx) { - if (ctx->ssl) { - // std::printf("ssl free %p\n", ctx->ssl); - SSL_free(static_cast(ctx->ssl)); - ctx->ssl = nullptr; - } - if (ctx->timeoutEvent) { - cancelTimeoutEvent(ctx->timeoutEvent); - } - sese::net::Socket::close(ctx->fd); - ctx->client->freeEventEx(ctx->event); - ctx->self->getDeleteContextCallback()(ctx); - delete ctx; -} - -int64_t IOCPService::read(int fd, void *buffer, size_t len, void *ssl) { - if (ssl) { - return SSL_read(static_cast(ssl), buffer, static_cast(len)); - } else { - return sese::net::Socket::read(fd, buffer, len, 0); - } -} - -int64_t IOCPService::write(int fd, const void *buffer, size_t len, void *ssl) { - if (ssl) { - return SSL_write(static_cast(ssl), buffer, static_cast(len)); - } else { - return sese::net::Socket::write(fd, buffer, len, 0); - } -} - -#pragma endregion Service - -#pragma region Server - -IOCPServer::IOCPServer() { - this->balanceLoader.setOnDispatchedCallbackFunction( - [&](int fd, sese::event::EventLoop *event_loop, void *data) { - this->preConnectCallback(fd, event_loop, static_cast(data)); - } - ); -} - -bool IOCPServer::init() { - if (balanceLoader.init([this]() -> auto { - return new IOCPService(this, this->activeReleaseMode); - })) { - balanceLoader.start(); - return true; - } - return false; -} - -void IOCPServer::shutdown() { - balanceLoader.stop(); -} - -void IOCPServer::postRead(Context *ctx) { - ctx->client->postRead(ctx); -} - -void IOCPServer::postWrite(Context *ctx) { - ctx->client->postWrite(ctx); -} - -void IOCPServer::postClose(Context *ctx) { - ctx->client->postClose(ctx); -} - -void IOCPServer::postConnect(const net::IPAddress::Ptr &to, const security::SSLContext::Ptr &cli_ctx, void *data) { - const auto SOCK = sese::net::Socket::socket(to->getFamily(), SOCK_STREAM, IPPROTO_IP); - sese::net::Socket::setNonblocking(SOCK); - - if (connect(SOCK, to->getRawAddress(), to->getRawAddressLength())) { - if (const auto ERR = sese::net::getNetworkError(); ERR != EINPROGRESS) { - return; - } - } - - const auto CTX = new Context; - CTX->fd = SOCK; - CTX->self = this; - CTX->isConn = true; - CTX->data = data; - if (cli_ctx) { - CTX->ssl = SSL_new(static_cast(cli_ctx->getContext())); - // printf("ssl new %p\n", ctx->ssl); - SSL_set_fd(static_cast(CTX->ssl), static_cast(SOCK)); - SSL_set_alpn_protos(static_cast(CTX->ssl), reinterpret_cast(clientProtos.c_str()), static_cast(clientProtos.length())); - } - - balanceLoader.dispatchSocket(SOCK, CTX); -} - -void IOCPServer::setTimeout(Context *ctx, int64_t seconds) { - if (ctx->timeoutEvent) { - ctx->client->cancelTimeoutEvent(ctx->timeoutEvent); - ctx->timeoutEvent = nullptr; - } - ctx->timeoutEvent = ctx->client->setTimeoutEvent(seconds, ctx); - ctx->timeoutEvent->data = ctx; -} - -void IOCPServer::cancelTimeout(Context *ctx) { - if (ctx->timeoutEvent) { - ctx->client->cancelTimeoutEvent(ctx->timeoutEvent); - ctx->timeoutEvent = nullptr; - } -} - -int IOCPServer::onAlpnSelect(const uint8_t **out, uint8_t *out_length, const uint8_t *in, uint32_t in_length) { - if (SSL_select_next_proto(const_cast(out), out_length, reinterpret_cast(servProtos.c_str()), static_cast(servProtos.length()), in, in_length) != OPENSSL_NPN_NEGOTIATED) { - return SSL_TLSEXT_ERR_NOACK; - } - return SSL_TLSEXT_ERR_OK; -} - -void IOCPServer::preConnectCallback([[maybe_unused]] int fd, sese::event::EventLoop *event_loop, Context *ctx) { - if (ctx) { - ctx->client = reinterpret_cast(event_loop); - onPreConnect(ctx); - postWrite(ctx); - } -} - -#pragma endregion Server diff --git a/sese/service/iocp/IOCPServer_V1.h b/sese/service/iocp/IOCPServer_V1.h deleted file mode 100644 index 60290a3a51..0000000000 --- a/sese/service/iocp/IOCPServer_V1.h +++ /dev/null @@ -1,383 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file IOCPServer_V1.h - * @brief IOCP server based on sese-event - * @author kaoru - * @version 0.1 - * @date September 25, 2023 - */ - -#pragma once - -#include -#include -#include -#include - -#include - -namespace sese::iocp::v1 { - -class Context; -class IOCPServer; -class IOCPService; - -/// IOCP server based on sese-event -class IOCPServer { -public: - virtual ~IOCPServer() = default; - using DeleteContextCallback = std::function; - - IOCPServer(); - - /** - * Initialize and start the server - * @return Initialization result - */ - bool init(); - /** - * Terminate worker threads, release system resources, and shut down the server - */ - void shutdown(); - /** - * Post a read event - * @param ctx Operation context - */ - static void postRead(Context *ctx); - /** - * Post a write event - * @param ctx Operation context - */ - static void postWrite(Context *ctx); - /** - * Post a close event - * @param ctx Operation context - */ - static void postClose(Context *ctx); - /** - * Post a connection event - * @param to Connection address - * @param cli_ctx SSL client context - * @param data Additional data - */ - void postConnect(const net::IPAddress::Ptr &to, const security::SSLContext::Ptr &cli_ctx, void *data = nullptr); - /** - * Set a timeout event - * @param ctx Operation context - * @param seconds Timeout duration - */ - static void setTimeout(Context *ctx, int64_t seconds); - /** - * Cancel the timeout event - * @param ctx Operation context - */ - static void cancelTimeout(Context *ctx); - /** - * Default context release callback function - */ - static void onDeleteContext(Context *) {} - /** - * Handshake completed callback function - * @param ctx Operation context - */ - virtual void onAcceptCompleted(Context *ctx) {} - /** - * Callback function triggered by a read event - * @param ctx Operation context - */ - virtual void onPreRead(Context *ctx) {} - /** - * Callback function for completion of a read event - * @param ctx Operation context - */ - virtual void onReadCompleted(Context *ctx) {} - /** - * Write event completed callback function - * @param ctx Operation context - */ - virtual void onWriteCompleted(Context *ctx) {} - /** - * Timeout event callback function - * @param ctx Operation context - */ - virtual void onTimeout(Context *ctx) {} - /** - * Connection pre-event callback function - * @param ctx Operation context - */ - virtual void onPreConnect(Context *ctx) {} - /** - * Connection event callback function - * @param ctx Operation context - */ - virtual void onConnected(Context *ctx) {} - /** - * ALPN protocol negotiation completion callback function - * @param ctx Context - * @param in Negotiation content - * @param in_length Length of negotiation content - */ - virtual void onAlpnGet(Context *ctx, const uint8_t *in, uint32_t in_length) {} - /** - * ALPN negotiation callback function - * @param out Expected content from the other side - * @param out_length Length of expected content from the other side - * @param in Response content - * @param in_length Length of response content - * @return ALPN status code - */ - int onAlpnSelect( - const uint8_t **out, uint8_t *out_length, - const uint8_t *in, uint32_t in_length - ); - -public: - /** - * Set the IP address to which the current service is bound. Setting this option will make the server automatically listen to the corresponding port of the address - * @param addr Target IP - */ - void setAddress(const net::IPAddress::Ptr &addr) { balanceLoader.setAddress(addr); } - /** - * Set the desired number of threads for the service - * @param threads Number of threads - */ - void setThreads(size_t threads) { balanceLoader.setThreads(threads); } - /** - * Set the SSL context used for server listening - * @param ctx SSL context - */ - void setServCtx(const security::SSLContext::Ptr &ctx) { IOCPServer::sslCtx = ctx; } - /** - * Set the ALPN negotiation content for the server - * @param protos Protocol negotiation content - */ - void setServProtos(const std::string &protos) { IOCPServer::servProtos = protos; } - /** - * Set the ALPN negotiation content for the client - * @param protos Protocol negotiation content - */ - void setClientProtos(const std::string &protos) { IOCPServer::clientProtos = protos; } - /** - * Set the server operation context destruction callback function - * @param callback Callback function - */ - void setDeleteContextCallback(const DeleteContextCallback &callback) { IOCPServer::deleteContextCallback = callback; } - /** - * Get the current SSL context for server listening - * @return SSL context - */ - [[nodiscard]] const security::SSLContext::Ptr &getServCtx() const { return IOCPServer::sslCtx; } - /** - * Get the current operation context destruction callback function for the service - * @return Callback function - */ - [[nodiscard]] const DeleteContextCallback &getDeleteContextCallback() const { return IOCPServer::deleteContextCallback; }; - - /** - * Get the status of the active release mode - * @return Status of the active release mode - */ - [[nodiscard]] bool isActiveReleaseMode() const { return activeReleaseMode; } - -public: - /** - * Set the connection access timeout duration. This function is only for IOCP implementations based on sese-event - * @param seconds Connection access timeout duration - */ - [[maybe_unused]] void setAcceptTimeout(uint32_t seconds) { balanceLoader.setAcceptTimeout(seconds); } - /** - * Set the connection dispatch timeout duration. This function is only for IOCP implementations based on sese-event - * @param seconds Connection dispatch timeout duration - */ - [[maybe_unused]] void setDispatchTimeout(uint32_t seconds) { balanceLoader.setDispatchTimeout(seconds); } - -protected: - void preConnectCallback(int fd, sese::event::EventLoop *event_loop, Context *ctx); - - /** - * Set active release mode - * @param enable Enable or disable - */ - void setActiveReleaseMode(bool enable) { activeReleaseMode = enable; } - - DeleteContextCallback deleteContextCallback = onDeleteContext; - security::SSLContext::Ptr sslCtx{}; - std::string servProtos{}; - std::string clientProtos{}; - service::UserBalanceLoader balanceLoader; - -private: - bool activeReleaseMode = true; -}; - -/// Completion port operation context based on sese-event -class Context final : public io::InputStream, public io::OutputStream, public io::PeekableStream { - friend class IOCPServer; - friend class IOCPService; - IOCPServer *self{}; - IOCPService *client{}; - socket_t fd{0}; - void *ssl{}; - bool isConn{false}; - event::BaseEvent *event{}; - service::v2::TimeoutEvent *timeoutEvent{}; - io::ByteBuilder send{8192, 8192}; - io::ByteBuilder recv{8192, 8192}; - void *data{}; - -public: - /** - * Read content from the current connection - * @param buffer Buffer - * @param length Size of buffer - * @return Actual read size - */ - int64_t read(void *buffer, size_t length) override; - /** - * Write content to the current connection - * @param buffer Buffer - * @param length Size of buffer - * @return Actual written size - */ - int64_t write(const void *buffer, size_t length) override; - /** - * Read content from the current connection without advancing - * @param buffer Buffer - * @param length Size of buffer - * @return Actual read size - */ - int64_t peek(void *buffer, size_t length) override; - /** - * Advance in the current connection without reading content - * @param length Advance size - * @return Actual advance size - */ - int64_t trunc(size_t length) override; - /** - * Get the file descriptor of the current context connection - * @return File descriptor - */ - [[nodiscard]] int32_t getFd() const { return static_cast(Context::fd); } - /** - * Get additional data of the current context - * @return Additional data - */ - [[nodiscard]] void *getData() const { return Context::data; } - /** - * Set additional data for the current context - * @param p_data Additional data - */ - [[maybe_unused]] void setData(void *p_data) { Context::data = p_data; } -}; - -/// Completion port sub-service based on sese-event -class IOCPService final : public service::v2::TimerableService { -public: - /// Initialize sub-service - /// \param master Master server - /// \param active_release_mode Active release mode - explicit IOCPService(IOCPServer *master, bool active_release_mode); - ~IOCPService() override; - - /** - * Submit read event to the master server - * @param ctx Operation context - */ - void postRead(Context *ctx); - /** - * Submit write event to the master server - * @param ctx Operation context - */ - void postWrite(Context *ctx); - /** - * Submit close event to the master server - * @param ctx Operation context - */ - void postClose(Context *ctx); - /** - * Sub-server connection access completion callback function - * @param ctx Operation context - */ - static void onAcceptCompleted(Context *ctx); - /** - * Sub-service read event trigger callback function - * @param ctx Operation context - */ - static void onPreRead(Context *ctx); - /** - * Sub-service read event completion trigger callback function - * @param ctx Operation context - */ - static void onReadCompleted(Context *ctx); - /** - * Sub-service write event completion trigger callback function - * @param ctx Operation context - */ - static void onWriteCompleted(Context *ctx); - /** - * Sub-service timeout callback function - * @param ctx Operation context - */ - static void onTimeout(Context *ctx); - /** - * Connection event callback function - * @param ctx Operation context - */ - static void onConnected(Context *ctx); - /** - * ALPN negotiation completion callback function - * @param ctx Operation context - * @param in Negotiation content - * @param in_length Negotiation content length - */ - static void onAlpnGet(Context *ctx, const uint8_t *in, uint32_t in_length); - /** - * ALPN negotiation callback function - * @param ssl SSL context - * @param out Expected content from peer - * @param out_length Length of expected content from peer - * @param in Response content - * @param in_length Length of response content - * @param service Associated sub-service - * @return ALPN status code - */ - static int alpnCallbackFunction( - void *ssl, - const uint8_t **out, uint8_t *out_length, - const uint8_t *in, uint32_t in_length, - IOCPService *service - ); - -private: - void onAccept(int fd) override; - void onRead(event::BaseEvent *event) override; - void onWrite(event::BaseEvent *event) override; - void onClose(event::BaseEvent *event) override; - void onTimeout(service::v2::TimeoutEvent *event) override; - - event::BaseEvent *createEventEx(int fd, unsigned int events, void *data); - void freeEventEx(sese::event::BaseEvent *event); - - void releaseContext(Context *ctx); - - static int64_t read(int fd, void *buffer, size_t len, void *ssl); - static int64_t write(int fd, const void *buffer, size_t len, void *ssl); - - IOCPServer *master{}; - std::set eventSet; -}; - -} // namespace sese::iocp::v1 \ No newline at end of file diff --git a/sese/test/TestHttp2.cpp b/sese/test/TestHttp2.cpp index 4bf556fabd..2d2934e28e 100644 --- a/sese/test/TestHttp2.cpp +++ b/sese/test/TestHttp2.cpp @@ -18,7 +18,6 @@ #include "sese/net/http/HPackUtil.h" #include "sese/net/http/Huffman.h" #include "sese/record/Marco.h" -#include "sese/service/BalanceLoader.h" #include "sese/system/Process.h" #include "gtest/gtest.h" diff --git a/sese/test/TestIOBuf.cpp b/sese/test/TestIOBuf.cpp index 75d9369703..2cd4420ec0 100644 --- a/sese/test/TestIOBuf.cpp +++ b/sese/test/TestIOBuf.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include #include diff --git a/sese/test/TestIOCP.cpp b/sese/test/TestIOCP.cpp deleted file mode 100644 index cea32dbf84..0000000000 --- a/sese/test/TestIOCP.cpp +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -#include -#include -#include -#include - -class MyIOCPServer : public sese::iocp::IOCPServer { -public: - MyIOCPServer() { - setDeleteContextCallback(myDeleter); - } - - void onAcceptCompleted(sese::iocp::Context *ctx) override { - SESE_INFO("onAcceptCompleted {}", ctx->getFd()); - postRead(ctx); - } - - void onPreRead(sese::iocp::Context *ctx) override { - SESE_INFO("onRreRead {}", ctx->getFd()); - } - - void onReadCompleted(sese::iocp::Context *ctx) override { - SESE_INFO("onReadCompleted {}", ctx->getFd()); - sese::streamMove(ctx, ctx, IOCP_WSABUF_SIZE); - postWrite(ctx); - } - - void onWriteCompleted(sese::iocp::Context *ctx) override { - SESE_INFO("onWriteCompleted {}", ctx->getFd()); - } - - void onConnected(sese::iocp::Context *ctx) override { - SESE_INFO("onConnected {}", ctx->getFd()); - } - - static void myDeleter(sese::iocp::Context *ctx) { - SESE_INFO("onDeleteCallback {}", ctx->getFd()); - } -}; - -TEST(TestIOCP, Server_0) { - auto address = sese::net::IPv4Address::localhost(sese::net::createRandomPort()); - - MyIOCPServer server; - server.setAddress(address); - server.setThreads(2); - server.setServProtos("\x8http/1.1"); - ASSERT_TRUE(server.init()); - SESE_INFO("server listening on {}", address->getPort()); - - sese::net::Socket socket(sese::net::Socket::Family::IPv4, sese::net::Socket::Type::TCP); - ASSERT_EQ(socket.connect(address), 0); - constexpr auto s = "Hello World"; - socket.write(s, strlen(s)); - - char buffer[32]{}; - socket.read(buffer, sizeof(buffer)); - SESE_INFO("echo from server, {}", buffer); - socket.close(); - - server.shutdown(); -} - -TEST(TestIOCP, Server_1) { - auto serv_ctx = sese::security::SSLContextBuilder::SSL4Server(); - serv_ctx->importCertFile(PROJECT_PATH "/sese/test/Data/test-ca.crt"); - serv_ctx->importPrivateKeyFile(PROJECT_PATH "/sese/test/Data/test-key.pem"); - ASSERT_TRUE(serv_ctx->authPrivateKey()); - - auto address = sese::net::IPv4Address::localhost(sese::net::createRandomPort()); - - MyIOCPServer server; - server.setAddress(address); - server.setServCtx(serv_ctx); - server.setThreads(2); - server.setServProtos("\x8http/1.1"); - ASSERT_TRUE(server.init()); - SESE_INFO("server listening on {}", address->getPort()); - - auto client_ctx = sese::security::SSLContextBuilder::SSL4Client(); - auto socket = client_ctx->newSocketPtr(sese::net::Socket::Family::IPv4, 0); - ASSERT_EQ(socket->connect(address), 0); - constexpr auto s = "Hello World"; - socket->write(s, strlen(s)); - - char buffer[32]{}; - socket->read(buffer, sizeof(buffer)); - SESE_INFO("echo from server, {}", buffer); - socket->close(); - - server.shutdown(); -} \ No newline at end of file diff --git a/sese/test/TestReusableSocket.cpp b/sese/test/TestReusableSocket.cpp deleted file mode 100644 index 3ccac4166c..0000000000 --- a/sese/test/TestReusableSocket.cpp +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright 2024 libsese -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "sese/net/ReusableSocket.h" -#include "sese/net/event/Event.h" -#include "sese/record/Marco.h" -#include "sese/thread/Thread.h" -#include "gtest/gtest.h" - -#include -#include - -using namespace std::chrono_literals; - -class MyEvent : public sese::event::EventLoop { -public: - void onAccept(int fd) override { - num++; - } - - void start() { - th = std::make_unique([this]() { - while (run) { - dispatch(100); - } - }, - "MyEventThread"); - th->start(); - } - - void stop() { - run = false; - th->join(); - } - - int getNum() const { return num; } - -private: - std::atomic_int num{0}; - std::atomic_bool run{true}; - sese::Thread::Ptr th{nullptr}; -}; - -// This test should only be for Linux -#ifdef SESE_PLATFORM_LINUX - -static sese::net::IPAddress::Ptr createAddress() { - auto port = sese::net::createRandomPort(); - SESE_INFO("select port {}\n", (int) port); - return sese::net::IPv4Address::create("127.0.0.1", port); -} - -TEST(TestReusableSocket, LoadBalancing) { - auto addr = createAddress(); - sese::net::ReusableSocket reusableSocket(addr); - - auto sock1 = reusableSocket.makeRawSocket(); - auto sock2 = reusableSocket.makeRawSocket(); - auto sock3 = reusableSocket.makeSocket(); - - ASSERT_NE(sock1, -1); - ASSERT_NE(sock2, -1); - ASSERT_NE(sock3, std::nullopt); - - ASSERT_EQ(sese::net::Socket::setNonblocking(sock1), 0); - ASSERT_EQ(sese::net::Socket::setNonblocking(sock2), 0); - - sese::net::Socket::listen(sock1, 15); - sese::net::Socket::listen(sock2, 15); - - MyEvent event1; - event1.setListenFd((int) sock1); - ASSERT_TRUE(event1.init()); - - MyEvent event2; - event2.setListenFd((int) sock2); - ASSERT_TRUE(event2.init()); - - event1.start(); - event2.start(); - - std::vector socketVector; - for (int i = 0; i < 20; ++i) { - auto s = sese::net::Socket(sese::net::Socket::Family::IPv4, sese::net::Socket::Type::TCP); - s.setNonblocking(true); - socketVector.emplace_back(s); - } - for (decltype(auto) s: socketVector) { - s.connect(addr); - } - std::this_thread::sleep_for(300ms); - for (decltype(auto) s: socketVector) { - s.close(); - } - - event1.stop(); - event2.stop(); - - sese::net::Socket::close(sock1); - sese::net::Socket::close(sock2); - - SESE_INFO("Socket1: {}\nSocket2: {}\n", event1.getNum(), event2.getNum()); -} - -#endif - -TEST(TestReusableSocket, Error) { - auto addr = sese::net::IPv4Address::create("0.0.0.1", 0); - sese::net::ReusableSocket reusable_socket(addr); - - auto sock1 = reusable_socket.makeRawSocket(); - auto sock2 = reusable_socket.makeSocket(); - ASSERT_EQ(sock1, -1); - ASSERT_EQ(sock2, std::nullopt); -} \ No newline at end of file diff --git a/sese/test/TestService.cpp b/sese/test/TestService.cpp index 560d57481f..6a240218df 100644 --- a/sese/test/TestService.cpp +++ b/sese/test/TestService.cpp @@ -15,18 +15,12 @@ #include "sese/config/Json.h" #include "sese/net/Socket.h" #include "sese/net/http/RequestableFactory.h" -#include "sese/service/SystemBalanceLoader.h" -#include "sese/service/UserBalanceLoader.h" -#include "sese/service/TimerableService_V1.h" -#include "sese/service/TimerableService_V2.h" #include "sese/service/http/HttpServer.h" #include "sese/security/SSLContextBuilder.h" #include "sese/io/ConsoleOutputStream.h" #include "sese/record/Marco.h" #include "gtest/gtest.h" -#pragma region HttpServer_V3 - #define ASSERT_NOT_NULL(x) ASSERT_TRUE(x != nullptr) SESE_CTRL(MyController) { @@ -238,221 +232,3 @@ TEST_F(TestHttpServerV3, Range) { range(false, port); } -#pragma endregion - -#pragma region BuiltinEventModel - -#define printf SESE_INFO - -class MyService final : public sese::event::EventLoop { -public: - ~MyService() override { - printf("total socket into: {}", num); - } - - void onAccept(int fd) override { - num += 1; - sese::net::Socket::close(fd); - } - -protected: - int num = 0; -}; - -static sese::net::IPAddress::Ptr createAddress() { - auto port = sese::net::createRandomPort(); - printf("select port {}", (int) port); - return sese::net::IPv4Address::create("127.0.0.1", port); -} - -TEST(TestService, SystemBalanceLoader) { - auto addr = createAddress(); - - sese::service::SystemBalanceLoader service; - service.setThreads(4); - service.setAddress(addr); - ASSERT_TRUE(service.init()); - - service.start(); - ASSERT_TRUE(service.isStarted()); - - std::vector socket_vector; - for (int i = 0; i < 20; ++i) { - auto s = sese::net::Socket(sese::net::Socket::Family::IPv4, sese::net::Socket::Type::TCP); - sese::net::Socket::setNonblocking(true); - socket_vector.emplace_back(s); - } - for (decltype(auto) s: socket_vector) { - s.connect(addr); - } - std::this_thread::sleep_for(300ms); - for (decltype(auto) s: socket_vector) { - s.close(); - } - - std::this_thread::sleep_for(300ms); - service.stop(); -} - -TEST(TestService, UserBalanceLoader) { - auto addr = createAddress(); - - sese::service::UserBalanceLoader service; - service.setThreads(3); - service.setAddress(addr); - ASSERT_TRUE(service.init()); - - service.start(); - ASSERT_TRUE(service.isStarted()); - - std::vector socket_vector; - for (int i = 0; i < 20; ++i) { - auto s = sese::net::Socket(sese::net::Socket::Family::IPv4, sese::net::Socket::Type::TCP); - sese::net::Socket::setNonblocking(true); - socket_vector.emplace_back(s); - } - for (decltype(auto) s: socket_vector) { - s.connect(addr); - } - std::this_thread::sleep_for(500ms); - for (decltype(auto) s: socket_vector) { - s.close(); - } - - std::this_thread::sleep_for(300ms); - service.stop(); -} - -class MyTimerableServiceV1 : public sese::service::v1::TimerableService { -public: - void onAccept(int fd) override { - printf("fd {} connect", fd); - if (0 == sese::net::Socket::setNonblocking(static_cast(fd))) { - auto event = createEvent(fd, EVENT_READ, nullptr); - createTimeoutEvent(fd, event, 3); - } else { - sese::net::Socket::close(fd); - } - } - - void onRead(sese::event::BaseEvent *event) override { - auto timeout_event = getTimeoutEventByFd(event->fd); - // timeoutEvent will not be nullptr - cancelTimeoutEvent(timeout_event); - char buffer[1024]{}; - sese::net::Socket::read(timeout_event->fd, buffer, 1024, 0); - puts(buffer); - setEvent(event); - setTimeoutEvent(timeout_event, 3); - } - - void onTimeout(sese::service::v1::TimeoutEvent *timeout_event) override { - printf("fd {} close", timeout_event->fd); - sese::net::Socket::close(timeout_event->fd); - auto event = static_cast(timeout_event->data); - freeEvent(event); - } -}; - -TEST(TestService, TimerableService) { - auto addr = createAddress(); - - sese::service::UserBalanceLoader service; - service.setThreads(3); - service.setAddress(addr); - service.setAcceptTimeout(500); - service.setDispatchTimeout(500); - ASSERT_TRUE(service.init()); - - service.start(); - ASSERT_TRUE(service.isStarted()); - - std::vector socket_vector; - for (int i = 0; i < 20; ++i) { - auto s = sese::net::Socket(sese::net::Socket::Family::IPv4, sese::net::Socket::Type::TCP); - sese::net::Socket::setNonblocking(true); - socket_vector.emplace_back(s); - } - for (decltype(auto) s: socket_vector) { - s.connect(addr); - } - socket_vector[4].write("Hello", 5); // NOLINT - std::this_thread::sleep_for(5s); - for (decltype(auto) s: socket_vector) { - s.close(); - } - - std::this_thread::sleep_for(300ms); - service.stop(); -} - -class MyTimerableServiceV2 : public sese::service::v2::TimerableService { -public: - void onAccept(int fd) override { - printf("fd {} connect", fd); - if (0 == sese::net::Socket::setNonblocking(static_cast(fd))) { - auto event = createEvent(fd, EVENT_READ, nullptr); - auto timeout = setTimeoutEvent(3, nullptr); - timeout->data = event; - event->data = timeout; - } else { - sese::net::Socket::close(fd); - } - } - - void onRead(sese::event::BaseEvent *event) override { - auto timeout_event = static_cast(event->data); - // timeoutEvent will not be nullptr - cancelTimeoutEvent(timeout_event); - timeout_event = nullptr; - // timeoutEvent has been delete - char buffer[1024]{}; - sese::net::Socket::read(event->fd, buffer, 1024, 0); - puts(buffer); - - timeout_event = setTimeoutEvent(3, nullptr); - setEvent(event); - event->data = timeout_event; - timeout_event->data = event; - } - - void onTimeout(sese::service::v2::TimeoutEvent *timeout_event) override { - auto event = static_cast(timeout_event->data); - printf("fd {} close", event->fd); - sese::net::Socket::close(event->fd); - freeEvent(event); - } -}; - -TEST(TestService, TimerableService_V2) { - auto addr = createAddress(); - - sese::service::UserBalanceLoader service; - service.setThreads(3); - service.setAddress(addr); - service.setAcceptTimeout(500); - service.setDispatchTimeout(500); - ASSERT_TRUE(service.init()); - - service.start(); - ASSERT_TRUE(service.isStarted()); - - std::vector socket_vector; - for (int i = 0; i < 20; ++i) { - auto s = sese::net::Socket(sese::net::Socket::Family::IPv4, sese::net::Socket::Type::TCP); - sese::net::Socket::setNonblocking(true); - socket_vector.emplace_back(s); - } - for (decltype(auto) s: socket_vector) { - s.connect(addr); - } - socket_vector[4].write("Hello", 5); // NOLINT - std::this_thread::sleep_for(5s); - for (decltype(auto) s: socket_vector) { - s.close(); - } - - std::this_thread::sleep_for(300ms); -} - -#pragma endregion \ No newline at end of file diff --git a/sese/test/TestWebsocketService.cpp b/sese/test/TestWebsocketService.cpp index 65dde53590..44e312a029 100644 --- a/sese/test/TestWebsocketService.cpp +++ b/sese/test/TestWebsocketService.cpp @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include -#include #include #include diff --git a/sese/service/iocp/IOBuf.cpp b/sese/util/IOBuf.cpp similarity index 98% rename from sese/service/iocp/IOBuf.cpp rename to sese/util/IOBuf.cpp index e18ef0cb45..7936564acb 100644 --- a/sese/service/iocp/IOBuf.cpp +++ b/sese/util/IOBuf.cpp @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "sese/service/iocp/IOBuf.h" -#include "sese/record/Marco.h" +#include "IOBuf.h" sese::iocp::IOBufNode::IOBufNode(size_t capacity) : CAPACITY(capacity) { buffer = malloc(capacity); diff --git a/sese/service/iocp/IOBuf.h b/sese/util/IOBuf.h similarity index 100% rename from sese/service/iocp/IOBuf.h rename to sese/util/IOBuf.h