Skip to content

Commit

Permalink
Add EvconnlistenerBase to avoid void* casts (apache#1641)
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice authored and p1u3o committed Aug 15, 2023
1 parent 2dec905 commit e2b1e00
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 26 deletions.
16 changes: 16 additions & 0 deletions src/common/event_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "event2/buffer.h"
#include "event2/bufferevent.h"
#include "event2/event.h"
#include "event2/listener.h"

template <typename F, F *f>
struct StaticFunction {
Expand Down Expand Up @@ -120,3 +121,18 @@ struct EventCallbackBase {

event *NewTimer(event_base *base) { return evtimer_new(base, timerCB, reinterpret_cast<void *>(this)); }
};

template <typename Derived>
struct EvconnlistenerBase {
private:
template <void (Derived::*cb)(evconnlistener *, evutil_socket_t, sockaddr *, int)>
static void callback(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen, void *ctx) {
return (reinterpret_cast<Derived *>(ctx)->*cb)(listener, fd, address, socklen);
}

public:
template <void (Derived::*cb)(evconnlistener *, evutil_socket_t, sockaddr *, int)>
evconnlistener *NewEvconnlistener(event_base *base, unsigned flags, int backlog, evutil_socket_t fd) {
return evconnlistener_new(base, callback<cb>, this, flags, backlog, fd);
}
};
41 changes: 19 additions & 22 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,9 @@ void Worker::TimerCB(int, int16_t events) {
KickoutIdleClients(config->timeout);
}

void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen, void *ctx) {
auto worker = static_cast<Worker *>(ctx);
void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen) {
int local_port = util::GetLocalPort(fd); // NOLINT
DLOG(INFO) << "[worker] New connection: fd=" << fd << " from port: " << local_port << " thread #" << worker->tid_;
DLOG(INFO) << "[worker] New connection: fd=" << fd << " from port: " << local_port << " thread #" << tid_;

auto s = util::SockSetTcpKeepalive(fd, 120);
if (!s.IsOK()) {
Expand All @@ -132,8 +131,8 @@ void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sock
bufferevent *bev = nullptr;
#ifdef ENABLE_OPENSSL
SSL *ssl = nullptr;
if (uint32_t(local_port) == worker->svr->GetConfig()->tls_port) {
ssl = SSL_new(worker->svr->ssl_ctx.get());
if (uint32_t(local_port) == svr->GetConfig()->tls_port) {
ssl = SSL_new(svr->ssl_ctx.get());
if (!ssl) {
LOG(ERROR) << "Failed to construct SSL structure for new connection: " << SSLErrors{};
evutil_closesocket(fd);
Expand All @@ -158,15 +157,15 @@ void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sock
return;
}
#ifdef ENABLE_OPENSSL
if (uint32_t(local_port) == worker->svr->GetConfig()->tls_port) {
if (uint32_t(local_port) == svr->GetConfig()->tls_port) {
bufferevent_openssl_set_allow_dirty_shutdown(bev, 1);
}
#endif
auto conn = new redis::Connection(bev, worker);
auto conn = new redis::Connection(bev, this);
conn->SetCB(bev);
bufferevent_enable(bev, EV_READ);

s = worker->AddConnection(conn);
s = AddConnection(conn);
if (!s.IsOK()) {
std::string err_msg = redis::Error("ERR " + s.Msg());
s = util::SockSend(fd, err_msg);
Expand All @@ -183,26 +182,24 @@ void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sock
conn->SetAddr(ip, port);
}

if (worker->rate_limit_group_) {
bufferevent_add_to_rate_limit_group(bev, worker->rate_limit_group_);
if (rate_limit_group_) {
bufferevent_add_to_rate_limit_group(bev, rate_limit_group_);
}
}

void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen,
void *ctx) {
auto worker = static_cast<Worker *>(ctx);
DLOG(INFO) << "[worker] New connection: fd=" << fd << " from unixsocket: " << worker->svr->GetConfig()->unixsocket
<< " thread #" << worker->tid_;
void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen) {
DLOG(INFO) << "[worker] New connection: fd=" << fd << " from unixsocket: " << svr->GetConfig()->unixsocket
<< " thread #" << tid_;
event_base *base = evconnlistener_get_base(listener);
auto ev_thread_safe_flags =
BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS | BEV_OPT_CLOSE_ON_FREE;
bufferevent *bev = bufferevent_socket_new(base, fd, ev_thread_safe_flags);

auto conn = new redis::Connection(bev, worker);
auto conn = new redis::Connection(bev, this);
conn->SetCB(bev);
bufferevent_enable(bev, EV_READ);

auto s = worker->AddConnection(conn);
auto s = AddConnection(conn);
if (!s.IsOK()) {
std::string err_msg = redis::Error("ERR " + s.Msg());
s = util::SockSend(fd, err_msg);
Expand All @@ -213,9 +210,9 @@ void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t f
return;
}

conn->SetAddr(worker->svr->GetConfig()->unixsocket, 0);
if (worker->rate_limit_group_) {
bufferevent_add_to_rate_limit_group(bev, worker->rate_limit_group_);
conn->SetAddr(svr->GetConfig()->unixsocket, 0);
if (rate_limit_group_) {
bufferevent_add_to_rate_limit_group(bev, rate_limit_group_);
}
}

Expand Down Expand Up @@ -256,7 +253,7 @@ Status Worker::listenTCP(const std::string &host, uint32_t port, int backlog) {
}

evutil_make_socket_nonblocking(fd);
auto lev = evconnlistener_new(base_, newTCPConnection, this, LEV_OPT_CLOSE_ON_FREE, backlog, fd);
auto lev = NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_CLOSE_ON_FREE, backlog, fd);
listen_events_.emplace_back(lev);
}

Expand All @@ -282,7 +279,7 @@ Status Worker::ListenUnixSocket(const std::string &path, int perm, int backlog)
}

evutil_make_socket_nonblocking(fd);
auto lev = evconnlistener_new(base_, newUnixSocketConnection, this, LEV_OPT_CLOSE_ON_FREE, backlog, fd);
auto lev = NewEvconnlistener<&Worker::newUnixSocketConnection>(base_, LEV_OPT_CLOSE_ON_FREE, backlog, fd);
listen_events_.emplace_back(lev);
if (perm != 0) {
chmod(sa.sun_path, (mode_t)perm);
Expand Down
7 changes: 3 additions & 4 deletions src/server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

class Server;

class Worker : EventCallbackBase<Worker> {
class Worker : EventCallbackBase<Worker>, EvconnlistenerBase<Worker> {
public:
Worker(Server *svr, Config *config);
~Worker();
Expand Down Expand Up @@ -76,9 +76,8 @@ class Worker : EventCallbackBase<Worker> {

private:
Status listenTCP(const std::string &host, uint32_t port, int backlog);
static void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen, void *ctx);
static void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen,
void *ctx);
void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen);
void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen);
redis::Connection *removeConnection(int fd);

event_base *base_;
Expand Down

0 comments on commit e2b1e00

Please sign in to comment.