Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EvconnlistenerBase to avoid void* casts #1641

Merged
merged 6 commits into from
Aug 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/common/event_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "event2/buffer.h"
#include "event2/bufferevent.h"
#include "event2/event.h"
#include "event2/listener.h"

template <typename F, F *f>
struct StaticFunction {
Expand Down Expand Up @@ -120,3 +121,18 @@ struct EventCallbackBase {

event *NewTimer(event_base *base) { return evtimer_new(base, timerCB, reinterpret_cast<void *>(this)); }
};

template <typename Derived>
struct EvconnlistenerBase {
private:
template <void (Derived::*cb)(evconnlistener *, evutil_socket_t, sockaddr *, int)>
static void callback(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen, void *ctx) {
return (reinterpret_cast<Derived *>(ctx)->*cb)(listener, fd, address, socklen);
}

public:
template <void (Derived::*cb)(evconnlistener *, evutil_socket_t, sockaddr *, int)>
evconnlistener *NewEvconnlistener(event_base *base, unsigned flags, int backlog, evutil_socket_t fd) {
return evconnlistener_new(base, callback<cb>, this, flags, backlog, fd);
}
};
41 changes: 19 additions & 22 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,9 @@ void Worker::TimerCB(int, int16_t events) {
KickoutIdleClients(config->timeout);
}

void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen, void *ctx) {
auto worker = static_cast<Worker *>(ctx);
void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen) {
int local_port = util::GetLocalPort(fd); // NOLINT
DLOG(INFO) << "[worker] New connection: fd=" << fd << " from port: " << local_port << " thread #" << worker->tid_;
DLOG(INFO) << "[worker] New connection: fd=" << fd << " from port: " << local_port << " thread #" << tid_;

auto s = util::SockSetTcpKeepalive(fd, 120);
if (!s.IsOK()) {
Expand All @@ -132,8 +131,8 @@ void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sock
bufferevent *bev = nullptr;
#ifdef ENABLE_OPENSSL
SSL *ssl = nullptr;
if (uint32_t(local_port) == worker->svr->GetConfig()->tls_port) {
ssl = SSL_new(worker->svr->ssl_ctx.get());
if (uint32_t(local_port) == svr->GetConfig()->tls_port) {
ssl = SSL_new(svr->ssl_ctx.get());
if (!ssl) {
LOG(ERROR) << "Failed to construct SSL structure for new connection: " << SSLErrors{};
evutil_closesocket(fd);
Expand All @@ -158,15 +157,15 @@ void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sock
return;
}
#ifdef ENABLE_OPENSSL
if (uint32_t(local_port) == worker->svr->GetConfig()->tls_port) {
if (uint32_t(local_port) == svr->GetConfig()->tls_port) {
bufferevent_openssl_set_allow_dirty_shutdown(bev, 1);
}
#endif
auto conn = new redis::Connection(bev, worker);
auto conn = new redis::Connection(bev, this);
conn->SetCB(bev);
bufferevent_enable(bev, EV_READ);

s = worker->AddConnection(conn);
s = AddConnection(conn);
if (!s.IsOK()) {
std::string err_msg = redis::Error("ERR " + s.Msg());
s = util::SockSend(fd, err_msg);
Expand All @@ -183,26 +182,24 @@ void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sock
conn->SetAddr(ip, port);
}

if (worker->rate_limit_group_) {
bufferevent_add_to_rate_limit_group(bev, worker->rate_limit_group_);
if (rate_limit_group_) {
bufferevent_add_to_rate_limit_group(bev, rate_limit_group_);
}
}

void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen,
void *ctx) {
auto worker = static_cast<Worker *>(ctx);
DLOG(INFO) << "[worker] New connection: fd=" << fd << " from unixsocket: " << worker->svr->GetConfig()->unixsocket
<< " thread #" << worker->tid_;
void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen) {
DLOG(INFO) << "[worker] New connection: fd=" << fd << " from unixsocket: " << svr->GetConfig()->unixsocket
<< " thread #" << tid_;
event_base *base = evconnlistener_get_base(listener);
auto ev_thread_safe_flags =
BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS | BEV_OPT_CLOSE_ON_FREE;
bufferevent *bev = bufferevent_socket_new(base, fd, ev_thread_safe_flags);

auto conn = new redis::Connection(bev, worker);
auto conn = new redis::Connection(bev, this);
conn->SetCB(bev);
bufferevent_enable(bev, EV_READ);

auto s = worker->AddConnection(conn);
auto s = AddConnection(conn);
if (!s.IsOK()) {
std::string err_msg = redis::Error("ERR " + s.Msg());
s = util::SockSend(fd, err_msg);
Expand All @@ -213,9 +210,9 @@ void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t f
return;
}

conn->SetAddr(worker->svr->GetConfig()->unixsocket, 0);
if (worker->rate_limit_group_) {
bufferevent_add_to_rate_limit_group(bev, worker->rate_limit_group_);
conn->SetAddr(svr->GetConfig()->unixsocket, 0);
if (rate_limit_group_) {
bufferevent_add_to_rate_limit_group(bev, rate_limit_group_);
}
}

Expand Down Expand Up @@ -256,7 +253,7 @@ Status Worker::listenTCP(const std::string &host, uint32_t port, int backlog) {
}

evutil_make_socket_nonblocking(fd);
auto lev = evconnlistener_new(base_, newTCPConnection, this, LEV_OPT_CLOSE_ON_FREE, backlog, fd);
auto lev = NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_CLOSE_ON_FREE, backlog, fd);
listen_events_.emplace_back(lev);
}

Expand All @@ -282,7 +279,7 @@ Status Worker::ListenUnixSocket(const std::string &path, int perm, int backlog)
}

evutil_make_socket_nonblocking(fd);
auto lev = evconnlistener_new(base_, newUnixSocketConnection, this, LEV_OPT_CLOSE_ON_FREE, backlog, fd);
auto lev = NewEvconnlistener<&Worker::newUnixSocketConnection>(base_, LEV_OPT_CLOSE_ON_FREE, backlog, fd);
listen_events_.emplace_back(lev);
if (perm != 0) {
chmod(sa.sun_path, (mode_t)perm);
Expand Down
7 changes: 3 additions & 4 deletions src/server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

class Server;

class Worker : EventCallbackBase<Worker> {
class Worker : EventCallbackBase<Worker>, EvconnlistenerBase<Worker> {
public:
Worker(Server *svr, Config *config);
~Worker();
Expand Down Expand Up @@ -76,9 +76,8 @@ class Worker : EventCallbackBase<Worker> {

private:
Status listenTCP(const std::string &host, uint32_t port, int backlog);
static void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen, void *ctx);
static void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen,
void *ctx);
void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen);
void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen);
redis::Connection *removeConnection(int fd);

event_base *base_;
Expand Down