diff --git a/src/common/event_util.h b/src/common/event_util.h index 8e92c062824..556f40a4417 100644 --- a/src/common/event_util.h +++ b/src/common/event_util.h @@ -27,6 +27,7 @@ #include "event2/buffer.h" #include "event2/bufferevent.h" #include "event2/event.h" +#include "event2/listener.h" template struct StaticFunction { @@ -120,3 +121,18 @@ struct EventCallbackBase { event *NewTimer(event_base *base) { return evtimer_new(base, timerCB, reinterpret_cast(this)); } }; + +template +struct EvconnlistenerBase { + private: + template + static void callback(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen, void *ctx) { + return (reinterpret_cast(ctx)->*cb)(listener, fd, address, socklen); + } + + public: + template + evconnlistener *NewEvconnlistener(event_base *base, unsigned flags, int backlog, evutil_socket_t fd) { + return evconnlistener_new(base, callback, this, flags, backlog, fd); + } +}; diff --git a/src/server/worker.cc b/src/server/worker.cc index befaf97a10d..788229cfd62 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -106,10 +106,9 @@ void Worker::TimerCB(int, int16_t events) { KickoutIdleClients(config->timeout); } -void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen, void *ctx) { - auto worker = static_cast(ctx); +void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen) { int local_port = util::GetLocalPort(fd); // NOLINT - DLOG(INFO) << "[worker] New connection: fd=" << fd << " from port: " << local_port << " thread #" << worker->tid_; + DLOG(INFO) << "[worker] New connection: fd=" << fd << " from port: " << local_port << " thread #" << tid_; auto s = util::SockSetTcpKeepalive(fd, 120); if (!s.IsOK()) { @@ -132,8 +131,8 @@ void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sock bufferevent *bev = nullptr; #ifdef ENABLE_OPENSSL SSL *ssl = nullptr; - if (uint32_t(local_port) == worker->svr->GetConfig()->tls_port) { - ssl = SSL_new(worker->svr->ssl_ctx.get()); + if (uint32_t(local_port) == svr->GetConfig()->tls_port) { + ssl = SSL_new(svr->ssl_ctx.get()); if (!ssl) { LOG(ERROR) << "Failed to construct SSL structure for new connection: " << SSLErrors{}; evutil_closesocket(fd); @@ -158,15 +157,15 @@ void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sock return; } #ifdef ENABLE_OPENSSL - if (uint32_t(local_port) == worker->svr->GetConfig()->tls_port) { + if (uint32_t(local_port) == svr->GetConfig()->tls_port) { bufferevent_openssl_set_allow_dirty_shutdown(bev, 1); } #endif - auto conn = new redis::Connection(bev, worker); + auto conn = new redis::Connection(bev, this); conn->SetCB(bev); bufferevent_enable(bev, EV_READ); - s = worker->AddConnection(conn); + s = AddConnection(conn); if (!s.IsOK()) { std::string err_msg = redis::Error("ERR " + s.Msg()); s = util::SockSend(fd, err_msg); @@ -183,26 +182,24 @@ void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sock conn->SetAddr(ip, port); } - if (worker->rate_limit_group_) { - bufferevent_add_to_rate_limit_group(bev, worker->rate_limit_group_); + if (rate_limit_group_) { + bufferevent_add_to_rate_limit_group(bev, rate_limit_group_); } } -void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen, - void *ctx) { - auto worker = static_cast(ctx); - DLOG(INFO) << "[worker] New connection: fd=" << fd << " from unixsocket: " << worker->svr->GetConfig()->unixsocket - << " thread #" << worker->tid_; +void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen) { + DLOG(INFO) << "[worker] New connection: fd=" << fd << " from unixsocket: " << svr->GetConfig()->unixsocket + << " thread #" << tid_; event_base *base = evconnlistener_get_base(listener); auto ev_thread_safe_flags = BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS | BEV_OPT_CLOSE_ON_FREE; bufferevent *bev = bufferevent_socket_new(base, fd, ev_thread_safe_flags); - auto conn = new redis::Connection(bev, worker); + auto conn = new redis::Connection(bev, this); conn->SetCB(bev); bufferevent_enable(bev, EV_READ); - auto s = worker->AddConnection(conn); + auto s = AddConnection(conn); if (!s.IsOK()) { std::string err_msg = redis::Error("ERR " + s.Msg()); s = util::SockSend(fd, err_msg); @@ -213,9 +210,9 @@ void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t f return; } - conn->SetAddr(worker->svr->GetConfig()->unixsocket, 0); - if (worker->rate_limit_group_) { - bufferevent_add_to_rate_limit_group(bev, worker->rate_limit_group_); + conn->SetAddr(svr->GetConfig()->unixsocket, 0); + if (rate_limit_group_) { + bufferevent_add_to_rate_limit_group(bev, rate_limit_group_); } } @@ -256,7 +253,7 @@ Status Worker::listenTCP(const std::string &host, uint32_t port, int backlog) { } evutil_make_socket_nonblocking(fd); - auto lev = evconnlistener_new(base_, newTCPConnection, this, LEV_OPT_CLOSE_ON_FREE, backlog, fd); + auto lev = NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_CLOSE_ON_FREE, backlog, fd); listen_events_.emplace_back(lev); } @@ -282,7 +279,7 @@ Status Worker::ListenUnixSocket(const std::string &path, int perm, int backlog) } evutil_make_socket_nonblocking(fd); - auto lev = evconnlistener_new(base_, newUnixSocketConnection, this, LEV_OPT_CLOSE_ON_FREE, backlog, fd); + auto lev = NewEvconnlistener<&Worker::newUnixSocketConnection>(base_, LEV_OPT_CLOSE_ON_FREE, backlog, fd); listen_events_.emplace_back(lev); if (perm != 0) { chmod(sa.sun_path, (mode_t)perm); diff --git a/src/server/worker.h b/src/server/worker.h index 63561ba8854..70cdace2f2b 100644 --- a/src/server/worker.h +++ b/src/server/worker.h @@ -42,7 +42,7 @@ class Server; -class Worker : EventCallbackBase { +class Worker : EventCallbackBase, EvconnlistenerBase { public: Worker(Server *svr, Config *config); ~Worker(); @@ -76,9 +76,8 @@ class Worker : EventCallbackBase { private: Status listenTCP(const std::string &host, uint32_t port, int backlog); - static void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen, void *ctx); - static void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen, - void *ctx); + void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen); + void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen); redis::Connection *removeConnection(int fd); event_base *base_;