diff --git a/CMakeLists.txt b/CMakeLists.txt index 97129aba3a22..1e0db289257e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -893,6 +893,8 @@ add_library(native STATIC ext/native/base/stringutil.h ext/native/base/timeutil.cpp ext/native/base/timeutil.h + ext/native/data/base64.cpp + ext/native/data/base64.h ext/native/data/compression.cpp ext/native/data/compression.h ext/native/file/chunk_file.cpp @@ -985,6 +987,8 @@ add_library(native STATIC ext/native/net/sinks.h ext/native/net/url.cpp ext/native/net/url.h + ext/native/net/websocket_server.cpp + ext/native/net/websocket_server.h ext/native/profiler/profiler.cpp ext/native/profiler/profiler.h ext/native/thin3d/thin3d.cpp @@ -1389,6 +1393,8 @@ add_library(${CoreLibName} ${CoreLinkType} Core/HDRemaster.cpp Core/HDRemaster.h Core/ThreadEventQueue.h + Core/WebServer.cpp + Core/WebServer.h Core/Debugger/Breakpoints.cpp Core/Debugger/Breakpoints.h Core/Debugger/DebugInterface.h diff --git a/Core/Core.vcxproj b/Core/Core.vcxproj index 209890e9f13f..b068e321e9c5 100644 --- a/Core/Core.vcxproj +++ b/Core/Core.vcxproj @@ -511,6 +511,7 @@ AnySuitable + @@ -746,6 +747,7 @@ + diff --git a/Core/Core.vcxproj.filters b/Core/Core.vcxproj.filters index 2efbdbd29adb..fe637033f806 100644 --- a/Core/Core.vcxproj.filters +++ b/Core/Core.vcxproj.filters @@ -689,6 +689,9 @@ Core + + Core + @@ -1268,6 +1271,9 @@ Core + + Core + diff --git a/Core/WebServer.cpp b/Core/WebServer.cpp new file mode 100644 index 000000000000..6b20646f2e9a --- /dev/null +++ b/Core/WebServer.cpp @@ -0,0 +1,278 @@ +// Copyright (c) 2014- PPSSPP Project. + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, version 2.0 or later versions. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License 2.0 for more details. + +// A copy of the GPL 2.0 should have been included with the program. +// If not, see http://www.gnu.org/licenses/ + +// Official git repository and contact information can be found at +// https://github.com/hrydgard/ppsspp and http://www.ppsspp.org/. + +#include +#include +#include +#include +#include "base/stringutil.h" +#include "base/timeutil.h" +#include "file/fd_util.h" +#include "net/http_client.h" +#include "net/http_server.h" +#include "net/sinks.h" +#include "thread/threadutil.h" +#include "Common/FileUtil.h" +#include "Common/Log.h" +#include "Core/Config.h" +#include "Core/WebServer.h" + +enum class ServerStatus { + STOPPED, + STARTING, + RUNNING, + STOPPING, + RESTARTING, +}; + +static const char *REPORT_HOSTNAME = "report.ppsspp.org"; +static const int REPORT_PORT = 80; + +static std::thread serverThread; +static ServerStatus serverStatus; +static std::mutex serverStatusLock; +static int serverFlags; + +static void UpdateStatus(ServerStatus s) { + std::lock_guard guard(serverStatusLock); + serverStatus = s; +} + +static bool UpdateStatus(ServerStatus s, ServerStatus old) { + std::lock_guard guard(serverStatusLock); + if (serverStatus == old) { + serverStatus = s; + return true; + } + return false; +} + +static ServerStatus RetrieveStatus() { + std::lock_guard guard(serverStatusLock); + return serverStatus; +} + +// This reports the local IP address to report.ppsspp.org, which can then +// relay that address to a mobile device searching for the server. +static void RegisterServer(int port) { + http::Client http; + Buffer theVoid; + + char resource4[1024] = {}; + if (http.Resolve(REPORT_HOSTNAME, REPORT_PORT, net::DNSType::IPV4)) { + if (http.Connect()) { + std::string ip = fd_util::GetLocalIP(http.sock()); + snprintf(resource4, sizeof(resource4) - 1, "/match/update?local=%s&port=%d", ip.c_str(), port); + + http.GET(resource4, &theVoid); + theVoid.Skip(theVoid.size()); + http.Disconnect(); + } + } + + if (http.Resolve(REPORT_HOSTNAME, REPORT_PORT, net::DNSType::IPV6)) { + // We register both IPv4 and IPv6 in case the other client is using a different one. + if (resource4[0] != 0 && http.Connect()) { + http.GET(resource4, &theVoid); + theVoid.Skip(theVoid.size()); + http.Disconnect(); + } + + // Currently, we're not using keepalive, so gotta reconnect... + if (http.Connect()) { + char resource6[1024] = {}; + std::string ip = fd_util::GetLocalIP(http.sock()); + snprintf(resource6, sizeof(resource6) - 1, "/match/update?local=%s&port=%d", ip.c_str(), port); + + http.GET(resource6, &theVoid); + theVoid.Skip(theVoid.size()); + http.Disconnect(); + } + } +} + +static void RegisterDiscHandlers(http::Server *http, std::unordered_map *paths) { + for (std::string filename : g_Config.recentIsos) { +#ifdef _WIN32 + static const std::string sep = "\\/"; +#else + static const std::string sep = "/"; +#endif + size_t basepos = filename.find_last_of(sep); + std::string basename = "/" + (basepos == filename.npos ? filename : filename.substr(basepos + 1)); + + // Let's not serve directories, since they won't work. Only single files. + // Maybe can do PBPs and other files later. Would be neat to stream virtual disc filesystems. + if (endsWithNoCase(basename, ".cso") || endsWithNoCase(basename, ".iso")) { + (*paths)[ReplaceAll(basename, " ", "%20")] = filename; + } + } + + auto handler = [paths](const http::Request &request) { + std::string filename = (*paths)[request.resource()]; + s64 sz = File::GetFileSize(filename); + + std::string range; + if (request.Method() == http::RequestHeader::HEAD) { + request.WriteHttpResponseHeader(200, sz, "application/octet-stream", "Accept-Ranges: bytes\r\n"); + } else if (request.GetHeader("range", &range)) { + s64 begin = 0, last = 0; + if (sscanf(range.c_str(), "bytes=%lld-%lld", &begin, &last) != 2) { + request.WriteHttpResponseHeader(400, -1, "text/plain"); + request.Out()->Push("Could not understand range request."); + return; + } + + if (begin < 0 || begin > last || last >= sz) { + request.WriteHttpResponseHeader(416, -1, "text/plain"); + request.Out()->Push("Range goes outside of file."); + return; + } + + FILE *fp = File::OpenCFile(filename, "rb"); + if (!fp || fseek(fp, begin, SEEK_SET) != 0) { + request.WriteHttpResponseHeader(500, -1, "text/plain"); + request.Out()->Push("File access failed."); + if (fp) { + fclose(fp); + } + return; + } + + s64 len = last - begin + 1; + char contentRange[1024]; + sprintf(contentRange, "Content-Range: bytes %lld-%lld/%lld\r\n", begin, last, sz); + request.WriteHttpResponseHeader(206, len, "application/octet-stream", contentRange); + + const size_t CHUNK_SIZE = 16 * 1024; + char *buf = new char[CHUNK_SIZE]; + for (s64 pos = 0; pos < len; pos += CHUNK_SIZE) { + s64 chunklen = std::min(len - pos, (s64)CHUNK_SIZE); + fread(buf, chunklen, 1, fp); + request.Out()->Push(buf, chunklen); + } + fclose(fp); + delete [] buf; + request.Out()->Flush(); + } else { + request.WriteHttpResponseHeader(418, -1, "text/plain"); + request.Out()->Push("This server only supports range requests."); + } + }; + + for (auto pair : *paths) { + http->RegisterHandler(pair.first.c_str(), handler); + } +} + +static void ExecuteWebServer() { + setCurrentThreadName("HTTPServer"); + + auto http = new http::Server(new threading::NewThreadExecutor()); + std::unordered_map discPaths; + + if (serverFlags & (int)WebServerFlags::DISCS) { + RegisterDiscHandlers(http, &discPaths); + } + + if (!http->Listen(g_Config.iRemoteISOPort)) { + if (!http->Listen(0)) { + ERROR_LOG(FILESYS, "Unable to listen on any port"); + UpdateStatus(ServerStatus::STOPPED); + return; + } + } + UpdateStatus(ServerStatus::RUNNING); + + g_Config.iRemoteISOPort = http->Port(); + RegisterServer(http->Port()); + double lastRegister = real_time_now(); + while (RetrieveStatus() == ServerStatus::RUNNING) { + http->RunSlice(1.0); + + double now = real_time_now(); + if (now > lastRegister + 540.0) { + RegisterServer(http->Port()); + lastRegister = now; + } + } + + http->Stop(); + + // Move to STARTING to lock flags/STOPPING. + if (UpdateStatus(ServerStatus::STARTING, ServerStatus::RESTARTING)) { + ExecuteWebServer(); + } else { + UpdateStatus(ServerStatus::STOPPED); + } +} + +bool StartWebServer(WebServerFlags flags) { + std::lock_guard guard(serverStatusLock); + switch (serverStatus) { + case ServerStatus::RUNNING: + case ServerStatus::RESTARTING: + if ((serverFlags & (int)flags) == (int)flags) { + // Already running those flags. + return false; + } + serverStatus = ServerStatus::RESTARTING; + serverFlags |= (int)flags; + return true; + + case ServerStatus::STOPPED: + serverStatus = ServerStatus::STARTING; + serverFlags = (int)flags; + serverThread = std::thread(&ExecuteWebServer); + serverThread.detach(); + return true; + + default: + return false; + } +} + +bool StopWebServer(WebServerFlags flags) { + std::lock_guard guard(serverStatusLock); + if (serverStatus != ServerStatus::RUNNING && serverStatus != ServerStatus::RESTARTING) { + return false; + } + + serverFlags &= ~(int)flags; + if (serverFlags == 0) { + serverStatus = ServerStatus::STOPPING; + } else { + serverStatus = ServerStatus::RESTARTING; + } + return true; +} + +bool WebServerStopping(WebServerFlags flags) { + std::lock_guard guard(serverStatusLock); + if (serverStatus == ServerStatus::RESTARTING) { + return (serverFlags & (int)flags) == 0; + } + return serverStatus == ServerStatus::STOPPING; +} + +bool WebServerStopped(WebServerFlags flags) { + std::lock_guard guard(serverStatusLock); + if (serverStatus == ServerStatus::RUNNING) { + return (serverFlags & (int)flags) == 0; + } + return serverStatus == ServerStatus::STOPPED; +} diff --git a/Core/WebServer.h b/Core/WebServer.h new file mode 100644 index 000000000000..db0967cbf227 --- /dev/null +++ b/Core/WebServer.h @@ -0,0 +1,27 @@ +// Copyright (c) 2014- PPSSPP Project. + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, version 2.0 or later versions. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License 2.0 for more details. + +// A copy of the GPL 2.0 should have been included with the program. +// If not, see http://www.gnu.org/licenses/ + +// Official git repository and contact information can be found at +// https://github.com/hrydgard/ppsspp and http://www.ppsspp.org/. + +enum class WebServerFlags { + DISCS = 1, + + ALL = 1, +}; + +bool StartWebServer(WebServerFlags flags); +bool StopWebServer(WebServerFlags flags); +bool WebServerStopping(WebServerFlags flags); +bool WebServerStopped(WebServerFlags flags); diff --git a/UI/NativeApp.cpp b/UI/NativeApp.cpp index 7130c7d181ae..8ae0bedb41dc 100644 --- a/UI/NativeApp.cpp +++ b/UI/NativeApp.cpp @@ -84,6 +84,7 @@ #include "Core/HLE/sceUsbGps.h" #include "Core/Util/GameManager.h" #include "Core/Util/AudioFormat.h" +#include "Core/WebServer.h" #include "GPU/GPUInterface.h" #include "ui_atlas.h" @@ -587,7 +588,8 @@ void NativeInit(int argc, const char *argv[], const char *savegame_dir, const ch } if (g_Config.bRemoteShareOnStartup) { - StartRemoteISOSharing(); + // TODO: Separate config options. + StartWebServer(WebServerFlags::ALL); } std::string sysName = System_GetProperty(SYSPROP_NAME); diff --git a/UI/RemoteISOScreen.cpp b/UI/RemoteISOScreen.cpp index 9831701e17a9..31d8f9f9667a 100644 --- a/UI/RemoteISOScreen.cpp +++ b/UI/RemoteISOScreen.cpp @@ -18,20 +18,15 @@ #include #include #include -#include #include "base/timeutil.h" -#include "file/fd_util.h" #include "i18n/i18n.h" #include "json/json_reader.h" #include "net/http_client.h" -#include "net/http_server.h" #include "net/resolve.h" -#include "net/sinks.h" -#include "thread/threadutil.h" #include "Common/Common.h" -#include "Common/FileUtil.h" #include "Core/Config.h" +#include "Core/WebServer.h" #include "UI/RemoteISOScreen.h" using namespace UI; @@ -39,190 +34,9 @@ using namespace UI; static const char *REPORT_HOSTNAME = "report.ppsspp.org"; static const int REPORT_PORT = 80; -enum class ServerStatus { - STOPPED, - STARTING, - RUNNING, - STOPPING, -}; - -static std::thread *serverThread = nullptr; -static ServerStatus serverStatus; -static std::mutex serverStatusLock; -static std::condition_variable serverStatusCond; - static bool scanCancelled = false; static bool scanAborted = false; -static void UpdateStatus(ServerStatus s) { - std::lock_guard guard(serverStatusLock); - serverStatus = s; - serverStatusCond.notify_one(); -} - -static ServerStatus RetrieveStatus() { - std::lock_guard guard(serverStatusLock); - return serverStatus; -} - -// This reports the local IP address to report.ppsspp.org, which can then -// relay that address to a mobile device searching for the server. -static void RegisterServer(int port) { - http::Client http; - Buffer theVoid; - - char resource4[1024] = {}; - if (http.Resolve(REPORT_HOSTNAME, REPORT_PORT, net::DNSType::IPV4)) { - if (http.Connect(2, 20.0, &scanCancelled)) { - std::string ip = fd_util::GetLocalIP(http.sock()); - snprintf(resource4, sizeof(resource4) - 1, "/match/update?local=%s&port=%d", ip.c_str(), port); - - http.GET(resource4, &theVoid); - theVoid.Skip(theVoid.size()); - http.Disconnect(); - } - } - - if (http.Resolve(REPORT_HOSTNAME, REPORT_PORT, net::DNSType::IPV6)) { - // We register both IPv4 and IPv6 in case the other client is using a different one. - if (resource4[0] != 0 && http.Connect()) { - http.GET(resource4, &theVoid); - theVoid.Skip(theVoid.size()); - http.Disconnect(); - } - - // Currently, we're not using keepalive, so gotta reconnect... - if (http.Connect()) { - char resource6[1024] = {}; - std::string ip = fd_util::GetLocalIP(http.sock()); - snprintf(resource6, sizeof(resource6) - 1, "/match/update?local=%s&port=%d", ip.c_str(), port); - - http.GET(resource6, &theVoid); - theVoid.Skip(theVoid.size()); - http.Disconnect(); - } - } -} - -static void ExecuteServer() { - setCurrentThreadName("HTTPServer"); - - auto http = new http::Server(new threading::SameThreadExecutor()); - - std::map paths; - for (std::string filename : g_Config.recentIsos) { -#ifdef _WIN32 - static const std::string sep = "\\/"; -#else - static const std::string sep = "/"; -#endif - size_t basepos = filename.find_last_of(sep); - std::string basename = "/" + (basepos == filename.npos ? filename : filename.substr(basepos + 1)); - - // Let's not serve directories, since they won't work. Only single files. - // Maybe can do PBPs and other files later. Would be neat to stream virtual disc filesystems. - if (endsWithNoCase(basename, ".cso") || endsWithNoCase(basename, ".iso")) { - paths[ReplaceAll(basename, " ", "%20")] = filename; - } - } - - auto handler = [&](const http::Request &request) { - std::string filename = paths[request.resource()]; - s64 sz = File::GetFileSize(filename); - - std::string range; - if (request.Method() == http::RequestHeader::HEAD) { - request.WriteHttpResponseHeader(200, sz, "application/octet-stream", "Accept-Ranges: bytes\r\n"); - } else if (request.GetHeader("range", &range)) { - s64 begin = 0, last = 0; - if (sscanf(range.c_str(), "bytes=%lld-%lld", &begin, &last) != 2) { - request.WriteHttpResponseHeader(400, -1, "text/plain"); - request.Out()->Push("Could not understand range request."); - return; - } - - if (begin < 0 || begin > last || last >= sz) { - request.WriteHttpResponseHeader(416, -1, "text/plain"); - request.Out()->Push("Range goes outside of file."); - return; - } - - FILE *fp = File::OpenCFile(filename, "rb"); - if (!fp || fseek(fp, begin, SEEK_SET) != 0) { - request.WriteHttpResponseHeader(500, -1, "text/plain"); - request.Out()->Push("File access failed."); - if (fp) { - fclose(fp); - } - return; - } - - s64 len = last - begin + 1; - char contentRange[1024]; - sprintf(contentRange, "Content-Range: bytes %lld-%lld/%lld\r\n", begin, last, sz); - request.WriteHttpResponseHeader(206, len, "application/octet-stream", contentRange); - - const size_t CHUNK_SIZE = 16 * 1024; - char *buf = new char[CHUNK_SIZE]; - for (s64 pos = 0; pos < len; pos += CHUNK_SIZE) { - s64 chunklen = std::min(len - pos, (s64)CHUNK_SIZE); - fread(buf, chunklen, 1, fp); - request.Out()->Push(buf, chunklen); - } - fclose(fp); - delete [] buf; - request.Out()->Flush(); - } else { - request.WriteHttpResponseHeader(418, -1, "text/plain"); - request.Out()->Push("This server only supports range requests."); - } - }; - - for (auto pair : paths) { - http->RegisterHandler(pair.first.c_str(), handler); - } - - if (!http->Listen(g_Config.iRemoteISOPort)) { - if (!http->Listen(0)) { - ERROR_LOG(FILESYS, "Unable to listen on any port"); - UpdateStatus(ServerStatus::STOPPED); - return; - } - } - UpdateStatus(ServerStatus::RUNNING); - - g_Config.iRemoteISOPort = http->Port(); - RegisterServer(http->Port()); - double lastRegister = real_time_now(); - while (RetrieveStatus() == ServerStatus::RUNNING) { - http->RunSlice(5.0); - - double now = real_time_now(); - if (now > lastRegister + 540.0) { - RegisterServer(http->Port()); - lastRegister = now; - } - } - - http->Stop(); - - UpdateStatus(ServerStatus::STOPPED); -} - -bool StartRemoteISOSharing() { - std::lock_guard guard(serverStatusLock); - - if (serverStatus != ServerStatus::STOPPED) { - return false; - } - - serverStatus = ServerStatus::STARTING; - serverThread = new std::thread(&ExecuteServer); - serverThread->detach(); - - return true; -} - static bool FindServer(std::string &resultHost, int &resultPort) { http::Client http; Buffer result; @@ -393,11 +207,8 @@ RemoteISOScreen::RemoteISOScreen() : serverRunning_(false), serverStopping_(fals void RemoteISOScreen::update() { UIScreenWithBackground::update(); - bool nowRunning = RetrieveStatus() != ServerStatus::STOPPED; + bool nowRunning = !WebServerStopped(WebServerFlags::DISCS); if (serverStopping_ && !nowRunning) { - // Server stopped, delete the thread. - delete serverThread; - serverThread = nullptr; serverStopping_ = false; } @@ -424,11 +235,10 @@ void RemoteISOScreen::CreateViews() { rightColumnItems->SetSpacing(0.0f); Choice *browseChoice = new Choice(ri->T("Browse Games")); rightColumnItems->Add(browseChoice)->OnClick.Handle(this, &RemoteISOScreen::HandleBrowse); - ServerStatus status = RetrieveStatus(); - if (status == ServerStatus::STOPPING) { + if (WebServerStopping(WebServerFlags::DISCS)) { rightColumnItems->Add(new Choice(ri->T("Stopping..")))->SetDisabledPtr(&serverStopping_); browseChoice->SetEnabled(false); - } else if (status != ServerStatus::STOPPED) { + } else if (!WebServerStopped(WebServerFlags::DISCS)) { rightColumnItems->Add(new Choice(ri->T("Stop Sharing")))->OnClick.Handle(this, &RemoteISOScreen::HandleStopServer); browseChoice->SetEnabled(false); } else { @@ -450,7 +260,7 @@ void RemoteISOScreen::CreateViews() { } UI::EventReturn RemoteISOScreen::HandleStartServer(UI::EventParams &e) { - if (!StartRemoteISOSharing()) { + if (!StartWebServer(WebServerFlags::DISCS)) { return EVENT_SKIPPED; } @@ -458,13 +268,10 @@ UI::EventReturn RemoteISOScreen::HandleStartServer(UI::EventParams &e) { } UI::EventReturn RemoteISOScreen::HandleStopServer(UI::EventParams &e) { - std::lock_guard guard(serverStatusLock); - - if (serverStatus != ServerStatus::RUNNING) { + if (!StopWebServer(WebServerFlags::DISCS)) { return EVENT_SKIPPED; } - serverStatus = ServerStatus::STOPPING; serverStopping_ = true; RecreateViews(); @@ -683,13 +490,13 @@ void RemoteISOBrowseScreen::CreateViews() { } RemoteISOSettingsScreen::RemoteISOSettingsScreen() { - serverRunning_ = RetrieveStatus() != ServerStatus::STOPPED; + serverRunning_ = !WebServerStopped(WebServerFlags::DISCS); } void RemoteISOSettingsScreen::update() { UIDialogScreenWithBackground::update(); - bool nowRunning = RetrieveStatus() != ServerStatus::STOPPED; + bool nowRunning = !WebServerStopped(WebServerFlags::DISCS); if (serverRunning_ != nowRunning) { RecreateViews(); } diff --git a/UI/RemoteISOScreen.h b/UI/RemoteISOScreen.h index a4384d793e28..b7f7ff0f5c40 100644 --- a/UI/RemoteISOScreen.h +++ b/UI/RemoteISOScreen.h @@ -25,8 +25,6 @@ #include "UI/MiscScreens.h" #include "UI/MainScreen.h" -bool StartRemoteISOSharing(); - class RemoteISOScreen : public UIScreenWithBackground { public: RemoteISOScreen(); diff --git a/UWP/CoreUWP/CoreUWP.vcxproj b/UWP/CoreUWP/CoreUWP.vcxproj index 04eba6a66f3d..7cd70cfa7009 100644 --- a/UWP/CoreUWP/CoreUWP.vcxproj +++ b/UWP/CoreUWP/CoreUWP.vcxproj @@ -463,6 +463,7 @@ + @@ -673,6 +674,7 @@ + diff --git a/android/jni/Android.mk b/android/jni/Android.mk index 2c081f2ca74e..428134fbcbeb 100644 --- a/android/jni/Android.mk +++ b/android/jni/Android.mk @@ -297,6 +297,7 @@ EXEC_AND_LIB_FILES := \ $(SRC)/Core/Screenshot.cpp \ $(SRC)/Core/System.cpp \ $(SRC)/Core/TextureReplacer.cpp \ + $(SRC)/Core/WebServer.cpp \ $(SRC)/Core/Debugger/Breakpoints.cpp \ $(SRC)/Core/Debugger/SymbolMap.cpp \ $(SRC)/Core/Dialog/PSPDialog.cpp \ diff --git a/ext/native/Android.mk b/ext/native/Android.mk index bcef6cec25cd..b878bf146382 100644 --- a/ext/native/Android.mk +++ b/ext/native/Android.mk @@ -13,6 +13,7 @@ LOCAL_SRC_FILES :=\ base/timeutil.cpp \ base/colorutil.cpp \ base/stringutil.cpp \ + data/base64.cpp \ data/compression.cpp \ ext/rg_etc1/rg_etc1.cpp \ ext/cityhash/city.cpp \ @@ -64,6 +65,7 @@ LOCAL_SRC_FILES :=\ net/resolve.cpp \ net/sinks.cpp \ net/url.cpp \ + net/websocket_server.cpp \ profiler/profiler.cpp \ thread/executor.cpp \ thread/threadutil.cpp \ diff --git a/ext/native/data/base64.cpp b/ext/native/data/base64.cpp new file mode 100644 index 000000000000..82d428468c67 --- /dev/null +++ b/ext/native/data/base64.cpp @@ -0,0 +1,35 @@ +#include "data/base64.h" + +// TODO: This is a simple but not very efficient implementation. +std::string Base64Encode(const uint8_t *p, size_t sz) { + const char digits[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + + size_t unpaddedLength = (4 * sz + 3) / 3; + std::string result; + result.resize((unpaddedLength + 3) & ~3, '='); + + for (size_t i = 0; i < unpaddedLength; ++i) { + // This is the index into the original string. + size_t pos = (i * 3) / 4; + int8_t off = 2 * ((i * 3) % 4); + + int c = p[pos]; + if (off > 2) { + c <<= 8; + off -= 8; + + // Grab more bits from the next character. + if (pos + 1 < sz) { + c |= p[pos + 1]; + } + } + + // Since we take from the big end, off starts at 2 and goes down. + int8_t shift = 2 - off; + + // Now take the bits at off and encode the character. + result[i] = digits[(c >> shift) & 0x3F]; + } + + return result; +} diff --git a/ext/native/data/base64.h b/ext/native/data/base64.h new file mode 100644 index 000000000000..b963bfb391dd --- /dev/null +++ b/ext/native/data/base64.h @@ -0,0 +1,5 @@ +#pragma once + +#include + +std::string Base64Encode(const uint8_t *p, size_t sz); diff --git a/ext/native/native.vcxproj b/ext/native/native.vcxproj index 556c328d4fd9..8fb5a42746dd 100644 --- a/ext/native/native.vcxproj +++ b/ext/native/native.vcxproj @@ -202,6 +202,7 @@ + @@ -238,6 +239,7 @@ + @@ -317,6 +319,7 @@ + AnySuitable @@ -700,6 +703,7 @@ + diff --git a/ext/native/native.vcxproj.filters b/ext/native/native.vcxproj.filters index 999519c3bd0b..08305fee7ce8 100644 --- a/ext/native/native.vcxproj.filters +++ b/ext/native/native.vcxproj.filters @@ -341,6 +341,12 @@ json + + net + + + data + @@ -814,6 +820,12 @@ json + + net + + + data + diff --git a/ext/native/net/http_server.cpp b/ext/native/net/http_server.cpp index ddd893e5ac3a..ace77907c45f 100644 --- a/ext/native/net/http_server.cpp +++ b/ext/native/net/http_server.cpp @@ -87,8 +87,10 @@ void Request::WriteHttpResponseHeader(int status, int64_t size, const char *mime net::OutputSink *buffer = Out(); buffer->Printf("HTTP/1.0 %03d %s\r\n", status, statusStr); buffer->Push("Server: PPSSPPServer v0.1\r\n"); - buffer->Printf("Content-Type: %s\r\n", mimeType ? mimeType : DEFAULT_MIME_TYPE); - buffer->Push("Connection: close\r\n"); + if (!mimeType || strcmp(mimeType, "websocket") != 0) { + buffer->Printf("Content-Type: %s\r\n", mimeType ? mimeType : DEFAULT_MIME_TYPE); + buffer->Push("Connection: close\r\n"); + } if (size >= 0) { buffer->Printf("Content-Length: %llu\r\n", size); } diff --git a/ext/native/net/sinks.cpp b/ext/native/net/sinks.cpp index aaac2f18881a..d8d47dd08095 100644 --- a/ext/native/net/sinks.cpp +++ b/ext/native/net/sinks.cpp @@ -201,6 +201,11 @@ bool InputSink::Empty() { return valid_ == 0; } +bool InputSink::TryFill() { + Fill(); + return !Empty(); +} + OutputSink::OutputSink(size_t fd) : fd_(fd), read_(0), write_(0), valid_(0) { fd_util::SetNonBlocking((int)fd_, true); } @@ -316,7 +321,7 @@ bool OutputSink::Block() { return true; } -bool OutputSink::Flush() { +bool OutputSink::Flush(bool allowBlock) { while (valid_ > 0) { size_t avail = std::min(BUFFER_SIZE - read_, valid_); @@ -325,7 +330,7 @@ bool OutputSink::Flush() { if (bytes == 0) { // This may also drain. Either way, keep looping. - if (!Block()) { + if (!allowBlock || !Block()) { return false; } } diff --git a/ext/native/net/sinks.h b/ext/native/net/sinks.h index 0138d74a6c9e..308c784fb0eb 100644 --- a/ext/native/net/sinks.h +++ b/ext/native/net/sinks.h @@ -21,6 +21,7 @@ class InputSink { bool Skip(size_t bytes); bool Empty(); + bool TryFill(); private: void Fill(); @@ -49,7 +50,7 @@ class OutputSink { bool PushCRLF(const std::string &s); bool Printf(const char *fmt, ...); - bool Flush(); + bool Flush(bool allowBlock = true); bool Empty(); diff --git a/ext/native/net/websocket_server.cpp b/ext/native/net/websocket_server.cpp new file mode 100644 index 000000000000..fb430ee15f64 --- /dev/null +++ b/ext/native/net/websocket_server.cpp @@ -0,0 +1,533 @@ +#include +#include +#include +#include +#ifndef _WIN32 +#include +#include +#include +#include +#include +#include +#else +#include +#include +#include +#endif +#include "base/stringutil.h" +#include "data/base64.h" +#include "net/http_server.h" +#include "net/sinks.h" +#include "net/websocket_server.h" +// TODO: Not a great cross dependency. +#include "Common/Crypto/sha1.h" + +#ifdef _WIN32 +// Function Cross-Compatibility +#define strcasecmp _stricmp +#endif + +static const char *const WEBSOCKET_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + +namespace net { + +enum class Opcode { + CONTINUE = 0, + TEXT = 1, + BINARY = 2, + + CLOSE = 8, + PING = 9, + PONG = 10, + + PAYLOAD_MAX = 2, + CONTROL_MIN = 8, + CONTROL_MAX = 10, +}; + +static inline std::string TrimString(const std::string &s) { + auto wsfront = std::find_if_not(s.begin(), s.end(), [](int c) { + // isspace() expects 0 - 255, so convert any sign-extended value. + return std::isspace(c & 0xFF); + }); + auto wsback = std::find_if_not(s.rbegin(), s.rend(), [](int c){ + return std::isspace(c & 0xFF); + }).base(); + return wsback > wsfront ? std::string(wsfront, wsback) : std::string(); +} + +static bool ListContainsNoCase(const std::string &list, const std::string value) { + std::vector split; + SplitString(list, ',', split); + + for (auto item : split) { + std::transform(item.begin(), item.end(), item.begin(), tolower); + if (TrimString(item) == value) { + return true; + } + } + + return false; +} + +WebSocketServer *WebSocketServer::CreateAsUpgrade(const http::Request &request, const std::string &protocol) { + auto requireHeader = [&](const char *name, const char *expected) { + std::string val; + if (!request.GetHeader(name, &val)) { + return false; + } + return strcasecmp(val.c_str(), expected) == 0; + }; + auto requireHeaderContains = [&](const char *name, const char *expected) { + std::string val; + if (!request.GetHeader(name, &val)) { + return false; + } + return ListContainsNoCase(val, expected); + }; + + if (!requireHeader("upgrade", "websocket") || !requireHeaderContains("connection", "upgrade")) { + request.WriteHttpResponseHeader(400, -1, "text/plain"); + request.Out()->Push("Must send a websocket request."); + return nullptr; + } + if (!requireHeader("sec-websocket-version", "13")) { + request.WriteHttpResponseHeader(400, -1, "text/plain", "Sec-WebSocket-Version: 13\r\n"); + request.Out()->Push("Unsupported version."); + return nullptr; + } + + std::string requestedProtocols; + std::string obtainedProtocolHeader; + if (!protocol.empty() && request.GetHeader("sec-websocket-protocol", &requestedProtocols)) { + if (ListContainsNoCase(requestedProtocols, protocol)) { + obtainedProtocolHeader = "Sec-WebSocket-Protocol: " + protocol + "\r\n"; + } + } + + std::string key; + if (!request.GetHeader("sec-websocket-key", &key)) { + request.WriteHttpResponseHeader(400, -1, "text/plain"); + request.Out()->Push("Cannot accept without key."); + return nullptr; + } + + key += WEBSOCKET_GUID; + unsigned char accept[20]; + sha1((unsigned char *)key.c_str(), (int)key.size(), accept); + + std::string acceptKey = Base64Encode(accept, 20); + std::string otherHeaders = StringFromFormat("Upgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: %s\r\n%s", acceptKey.c_str(), obtainedProtocolHeader.c_str()); + + // Okay, we're good to go then. + request.WriteHttpResponseHeader(101, -1, "websocket", otherHeaders.c_str()); + request.WritePartial(); + + return new WebSocketServer(request.fd(), request.In(), request.Out()); +} + +void WebSocketServer::Send(const std::string &str) { + assert(open_); + assert(fragmentOpcode_ == -1); + SendHeader(true, (int)Opcode::TEXT, str.size()); + SendBytes(str.c_str(), str.size()); +} + +void WebSocketServer::Send(const std::vector &payload) { + assert(open_); + assert(fragmentOpcode_ == -1); + SendHeader(true, (int)Opcode::BINARY, payload.size()); + SendBytes((const char *)&payload[0], payload.size()); +} + +void WebSocketServer::AddFragment(bool finish, const std::string &str) { + assert(open_); + if (fragmentOpcode_ == -1) { + SendHeader(finish, (int)Opcode::TEXT, str.size()); + fragmentOpcode_ = (int)Opcode::TEXT; + } else if (fragmentOpcode_ == (int)Opcode::TEXT) { + SendHeader(finish, (int)Opcode::CONTINUE, str.size()); + } else { + assert(fragmentOpcode_ == (int)Opcode::TEXT || fragmentOpcode_ == -1); + } + SendBytes(str.c_str(), str.size()); + if (finish) { + fragmentOpcode_ = -1; + } +} + +void WebSocketServer::AddFragment(bool finish, const std::vector &payload) { + assert(open_); + if (fragmentOpcode_ == -1) { + SendHeader(finish, (int)Opcode::BINARY, payload.size()); + fragmentOpcode_ = (int)Opcode::BINARY; + } else if (fragmentOpcode_ == (int)Opcode::BINARY) { + SendHeader(finish, (int)Opcode::CONTINUE, payload.size()); + } else { + assert(fragmentOpcode_ == (int)Opcode::BINARY || fragmentOpcode_ == -1); + } + SendBytes((const char *)&payload[0], payload.size()); + if (finish) { + fragmentOpcode_ = -1; + } +} + +void WebSocketServer::Ping(const std::vector &payload) { + assert(open_); + assert(payload.size() <= 125); + SendHeader(true, (int)Opcode::PING, payload.size()); + SendBytes((const char *)&payload[0], payload.size()); +} + +void WebSocketServer::Pong(const std::vector &payload) { + assert(open_); + assert(payload.size() <= 125); + SendHeader(true, (int)Opcode::PONG, payload.size()); + SendBytes((const char *)&payload[0], payload.size()); +} + +void WebSocketServer::Close(WebSocketClose reason) { + closeReason_ = reason; + if (reason == WebSocketClose::NO_STATUS) { + // This means we received a CLOSE without a code. + SendHeader(true, (int)Opcode::CLOSE, 0); + } else { + SendHeader(true, (int)Opcode::CLOSE, 2); + + uint16_t r = (uint16_t)reason; + uint8_t reasonData[] = { + (uint8_t)((r >> 8) & 0xFF), + (uint8_t)((r >> 0) & 0xFF), + }; + SendBytes((const char *)reasonData, sizeof(reasonData)); + } + + sentClose_ = true; +} + +bool WebSocketServer::Process(float timeout) { + if (!open_) { + return false; + } + + SendFlush(); + + if (outBuf_.empty() && out_->Empty() && sentClose_) { + // Okay, we've sent the close. Don't wait for anything else (whether we got a close or not.) + open_ = false; + return false; + } + + struct timeval tv; + tv.tv_sec = floor(timeout); + tv.tv_usec = (timeout - floor(timeout)) * 1000000.0; + + fd_set read; + FD_ZERO(&read); + // In case we closed due to protocol error, don't even try to read. + if (!sentClose_) { + FD_SET(fd_, &read); + } + + fd_set write; + FD_ZERO(&write); + if (!outBuf_.empty() || !out_->Empty()) { + FD_SET(fd_, &write); + } + + // First argument to select is the highest socket in the set + 1. + int rval = select((int)fd_ + 1, &read, &write, nullptr, &tv); + if (rval < 0) { + // Something went wrong with the select() call. + // TODO: Could be EINTR, for now returning true... + return true; + } + + if (rval == 0) { + // Timed out. + return true; + } + + if (FD_ISSET(fd_, &write)) { + SendFlush(); + } + if (FD_ISSET(fd_, &read)) { + if (in_->Empty() && !in_->TryFill()) { + // Since select said it was readable, we assume this means disconnect. + closeReason_ = WebSocketClose::ABNORMAL; + open_ = false; + return false; + } + + while (ReadFrames() && !in_->Empty()) + continue; + } + + return true; +} + +bool WebSocketServer::ReadFrames() { + if (pendingLeft_ != 0) { + return ReadPending(); + } + + return ReadFrame(); +} + +bool WebSocketServer::ReadFrame() { + assert(pendingLeft_ == 0); + + // TODO: For now blocking on header trickle, shouldn't be common. + auto readExact = [&](void *p, size_t sz) { + if (!in_->TakeExact((char *)p, sz)) { + // TODO: Failing on too slow trickle timeout for now. + Close(WebSocketClose::POLICY_VIOLATION); + return false; + } + return true; + }; + + // Client frames are always between 6 and 14 bytes. We start with 6. + uint8_t header[14]; + if (!readExact(header, 6)) + return false; + + // Don't allow reserved bits to be set, require masking. + if ((header[0] & 0x70) != 0 || (header[1] & 0x80) == 0) { + Close(WebSocketClose::PROTOCOL_ERROR); + return false; + } + + const bool fin = (header[0] & 0x80) != 0; + const int opcode = header[0] & 0x0F; + uint64_t sz = header[1] & 0x7F; + const uint8_t *mask = &header[2]; + + if (opcode >= (int)Opcode::CONTROL_MIN && (sz > 125 || !fin)) { + // Control frames must be <= 125 bytes. + Close(WebSocketClose::PROTOCOL_ERROR); + return false; + } + + if (opcode > (int)Opcode::CONTROL_MAX || (opcode > (int)Opcode::PAYLOAD_MAX && opcode < (int)Opcode::CONTROL_MIN)) { + // Undefined opcode. + Close(WebSocketClose::PROTOCOL_ERROR); + return false; + } + + if (!pendingFin_ && opcode == (int)Opcode::CONTINUE) { + // Can't continue what you haven't started. + Close(WebSocketClose::PROTOCOL_ERROR); + return false; + } + if (pendingFin_ && opcode != (int)Opcode::CONTINUE && opcode < (int)Opcode::CONTROL_MIN) { + // Can't start something else until you finish your thought. + Close(WebSocketClose::PROTOCOL_ERROR); + return false; + } + + if (sz == 126) { + // Read the rest of the mask. + if (!readExact((char *)&header[6], 2)) + return false; + + mask = &header[4]; + sz = (header[2] << 8) | (header[3] << 0); + } else if (sz == 127) { + // We only have half the size so far - read the rest, and the mask. + if (!readExact((char *)&header[6], 8)) + return false; + + mask = &header[10]; + // Read from big endian. + uint64_t high = (header[2] << 24) | (header[3] << 16) || (header[4] << 8) | (header[5] << 0); + uint64_t low = (header[6] << 24) | (header[7] << 16) || (header[8] << 8) | (header[9] << 0); + sz = (high << 32) | low; + + if ((sz & 0x8000000000000000ULL) != 0) { + Close(WebSocketClose::PROTOCOL_ERROR); + return false; + } + } + + if (opcode >= (int)Opcode::CONTROL_MIN) { + // It's safe to overwrite this since we can be between fragmented frames, but not inside a frame. + memcpy(pendingMask_, mask, sizeof(pendingMask_)); + return ReadControlFrame(opcode, sz); + } + + // The data could be split among many TCP packets, so read it as it comes. + if (!pendingFin_) + pendingOpcode_ = opcode; + pendingFin_ = !fin; + pendingLeft_ = sz; + memcpy(pendingMask_, mask, sizeof(pendingMask_)); + + // Payload data is actually read in ReadPending(). + return true; +} + +bool WebSocketServer::ReadPending() { + size_t pos = pendingBuf_.size(); + pendingBuf_.resize(pendingBuf_.size() + pendingLeft_); + + // Read what we can. + size_t readBytes = in_->TakeAtMost((char *)&pendingBuf_[pos], pendingLeft_); + for (size_t i = 0; i < readBytes; ++i) { + pendingBuf_[pos + i] ^= pendingMask_[i & 3]; + } + pendingLeft_ -= readBytes; + + if (pendingLeft_ != 0) { + // Still more to read. Careful: we might need to rotate the mask. + // Example: if we read only 3 bytes, next read should start at fourth byte in mask. + int offset = readBytes & 3; + if (offset) { + uint8_t orig[4]; + memcpy(orig, pendingMask_, sizeof(orig)); + for (size_t i = 0; i < sizeof(orig); ++i) { + pendingMask_[i] = orig[(offset + i) & 3]; + } + } + + // Truncate out the unread bytes for next time. + pendingBuf_.resize(pos + readBytes); + return true; + } + + // We're done, but were we waiting for a FIN packet? + if (pendingFin_) + return true; + + if (pendingOpcode_ == (int)Opcode::TEXT) { + if (text_) { + text_(std::string(pendingBuf_.begin(), pendingBuf_.end())); + } + } else if (pendingOpcode_ == (int)Opcode::BINARY) { + if (binary_) { + binary_(pendingBuf_); + } + } else { + assert(false); + } + + // All done, clear it out. + pendingBuf_.clear(); + pendingOpcode_ = -1; + + return true; +} + +bool WebSocketServer::ReadControlFrame(int opcode, size_t sz) { + std::vector payload; + payload.resize(sz); + // Just block here to read the payload. + if (!in_->TakeExact((char *)&payload[0], sz)) { + // TODO: Failing on too slow trickle timeout for now. + Close(WebSocketClose::POLICY_VIOLATION); + return false; + } + + for (size_t i = 0; i < sz; ++i) { + payload[i] ^= pendingMask_[i & 3]; + } + + if (opcode == (int)Opcode::PING) { + Pong(payload); + // Try to send immediately if possible, but don't block. + SendFlush(); + + if (ping_) { + ping_(payload); + } + } else if (opcode == (int)Opcode::PONG) { + if (pong_) { + pong_(payload); + } + } else if (opcode == (int)Opcode::CLOSE) { + if (payload.size() >= 2) { + uint16_t reason = (payload[0] << 8) | payload[1]; + // Send back a close right away. + Close(WebSocketClose(reason)); + } else { + Close(WebSocketClose::NO_STATUS); + } + // Don't read anything more. + return false; + } else { + assert(false); + } + + return true; +} + +void WebSocketServer::SendHeader(bool fin, int opcode, size_t sz) { + assert((opcode & 0x0F) == opcode); + uint8_t frameHeader = (fin ? 0x80 : 0x00) | opcode; + SendBytes(&frameHeader, 1); + + // We never mask from the server. + if (sz <= 125) { + uint8_t frameSize = (int8_t)sz; + SendBytes(&frameSize, 1); + } else if (sz <= 0xFFFF) { + uint8_t frameSize[] = { + 126, + (uint8_t)((sz >> 8) & 0xFF), + (uint8_t)((sz >> 0) & 0xFF), + }; + SendBytes(frameSize, sizeof(frameSize)); + } else { + uint64_t sz64 = sz; + assert((sz64 & 0x8000000000000000ULL) == 0); + uint8_t frameSize[] = { + 127, + (uint8_t)((sz64 >> 56) & 0xFF), + (uint8_t)((sz64 >> 48) & 0xFF), + (uint8_t)((sz64 >> 40) & 0xFF), + (uint8_t)((sz64 >> 32) & 0xFF), + (uint8_t)((sz64 >> 24) & 0xFF), + (uint8_t)((sz64 >> 16) & 0xFF), + (uint8_t)((sz64 >> 8) & 0xFF), + (uint8_t)((sz64 >> 0) & 0xFF), + }; + SendBytes(frameSize, sizeof(frameSize)); + } +} + +void WebSocketServer::SendBytes(const void *p, size_t sz) { + const char *data = (const char *)p; + if (outBuf_.empty()) { + size_t pushed = out_->PushAtMost(data, sz); + data += pushed; + sz -= pushed; + } + + if (sz != 0) { + size_t pos = outBuf_.size(); + outBuf_.resize(pos + sz); + memcpy(&outBuf_[pos], data, sz); + } +} + +void WebSocketServer::SendFlush() { + out_->Flush(false); + + // Drain out as much of our buffer as possible. + size_t totalPushed = 0; + while (!outBuf_.empty()) { + size_t pushed = out_->PushAtMost((const char *)&outBuf_[totalPushed], outBuf_.size() - totalPushed); + if (pushed == 0) + break; + + totalPushed += pushed; + out_->Flush(false); + } + + if (totalPushed != 0) { + // Hopefully this is usually the entire buffer. + outBuf_.erase(outBuf_.begin(), outBuf_.begin() + totalPushed); + } +} + +}; diff --git a/ext/native/net/websocket_server.h b/ext/native/net/websocket_server.h new file mode 100644 index 000000000000..98426f53cf4e --- /dev/null +++ b/ext/native/net/websocket_server.h @@ -0,0 +1,104 @@ +#pragma once + +#include +#include +#include "net/http_server.h" +#include "net/sinks.h" + +namespace net { + +enum class WebSocketClose : uint16_t { + NORMAL = 1000, + GOING_AWAY = 1001, + PROTOCOL_ERROR = 1002, + UNSUPPORTED_DATA = 1003, + INVALID_DATA = 1007, + POLICY_VIOLATION = 1008, + MESSAGE_TOO_LONG = 1009, + MISSING_EXTENSION = 1010, + INTERNAL_ERROR = 1011, + SERVICE_RESTART = 1012, + TRY_AGAIN_LATER = 1013, + BAD_GATEWAY = 1014, + + NO_STATUS = 1005, + ABNORMAL = 1006, +}; + +// RFC 6455 +class WebSocketServer { +public: + static WebSocketServer *CreateAsUpgrade(const http::Request &request, const std::string &protocol = ""); + + void Send(const std::string &str); + void Send(const std::vector &payload); + + // Call with finish = false to start and continue, then finally with finish = true to complete. + // Note: Fragmented data cannot be interleaved, per protocol. + void AddFragment(bool finish, const std::string &str); + void AddFragment(bool finish, const std::vector &payload); + + void Ping(const std::vector &payload = {}); + void Pong(const std::vector &payload = {}); + void Close(WebSocketClose reason = WebSocketClose::GOING_AWAY); + + // Note: may interrupt early. Call in a loop. + bool Process(float timeout = -1.0f); + + void SetTextHandler(std::function func) { + text_ = func; + } + void SetBinaryHandler(std::function &)> func) { + binary_ = func; + } + // Doesn't need to send a Pong. + void SetPingHandler(std::function &)> func) { + ping_ = func; + } + void SetPongHandler(std::function &)> func) { + pong_ = func; + } + + bool IsOpen() { + return open_; + } + WebSocketClose CloseReason() { + return closeReason_; + } + +protected: + WebSocketServer(size_t fd, InputSink *in, OutputSink *out) : fd_(fd), in_(in), out_(out) { + } + + void SendHeader(bool fin, int opcode, size_t sz); + void SendBytes(const void *p, size_t sz); + void SendFlush(); + bool ReadFrames(); + bool ReadFrame(); + bool ReadPending(); + bool ReadControlFrame(int opcode, size_t sz); + + bool open_ = true; + bool sentClose_ = false; + int fragmentOpcode_ = -1; + size_t fd_ = 0; + InputSink *in_ = nullptr; + OutputSink *out_ = nullptr; + WebSocketClose closeReason_ = WebSocketClose::NO_STATUS; + std::vector outBuf_; + + std::vector pendingBuf_; + uint8_t pendingMask_[4]{}; + // Bytes left to read in the frame (in case of a partial frame read.) + uint64_t pendingLeft_ = 0; + int pendingOpcode_ = -1; + // Waiting for a frame with FIN. + bool pendingFin_ = false; + + std::function text_; + std::function &)> binary_; + std::function &)> ping_; + std::function &)> pong_; +}; + +}; diff --git a/ext/native/thread/executor.cpp b/ext/native/thread/executor.cpp index 8c5d6cd0977c..197f9d7920ab 100644 --- a/ext/native/thread/executor.cpp +++ b/ext/native/thread/executor.cpp @@ -1,11 +1,16 @@ #include "thread/executor.h" #include +#include namespace threading { void SameThreadExecutor::Run(std::function func) { - func(); + func(); +} + +void NewThreadExecutor::Run(std::function func) { + std::thread(func).detach(); } } // namespace threading diff --git a/ext/native/thread/executor.h b/ext/native/thread/executor.h index 7a5b5286a176..6d6515920c8e 100644 --- a/ext/native/thread/executor.h +++ b/ext/native/thread/executor.h @@ -6,13 +6,18 @@ namespace threading { // Stuff that can execute other stuff, like threadpools, should inherit from this. class Executor { - public: - virtual void Run(std::function func) = 0; +public: + virtual void Run(std::function func) = 0; }; class SameThreadExecutor : public Executor { - public: - virtual void Run(std::function func); +public: + void Run(std::function func) override; +}; + +class NewThreadExecutor : public Executor { +public: + void Run(std::function func) override; }; } // namespace threading