diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index 3df62e18e2d6e..df3db2dc9418e 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -805,7 +805,7 @@ void responderThread(std::shared_ptr dss) for (const auto& fd : sockets) { /* allocate one more byte so we can detect truncation */ - // NOLINTNEXTLINE(bugprone-use-after-move): resizing a vector has no preconditions so it is valid to do so after moving it + // NOLINTNEXTLINE(bugprone-use-after-move): resizing a vector has no preconditions so it is valid to do so after moving it response.resize(initialBufferSize + 1); ssize_t got = recv(fd, response.data(), response.size(), 0); diff --git a/pdns/dnsdistdist/doq-common.cc b/pdns/dnsdistdist/doq-common.cc index 4b0b2868f935c..3004d45728f1a 100644 --- a/pdns/dnsdistdist/doq-common.cc +++ b/pdns/dnsdistdist/doq-common.cc @@ -61,6 +61,7 @@ PacketBuffer mintToken(const PacketBuffer& dcid, const ComboAddress& peer) return encryptedTokenPacket; } catch (const std::exception& exp) { + cerr<<"error while minting token "< validateToken(const PacketBuffer& token, const Combo return PacketBuffer(plainText.begin() + (sizeof(ttd) + addrBytes.size()), plainText.end()); } catch (const std::exception& exp) { + cerr<<"error while validating token "<(out.data()), written, peer); + } + catch (const std::exception& exp) { + cerr<<"error sending version neg "<(out.data()), written, peer); + } + catch (const std::exception& exp) { + cerr<<"error sending egress "< d_streamBuffers; std::unordered_map d_streamOutBuffers; + uint64_t d_queries{0}; }; static void sendBackDOQUnit(DOQUnitUniquePtr&& unit, const char* description); @@ -271,6 +272,7 @@ static bool tryWriteResponse(Connection& conn, const uint64_t streamID, PacketBu return false; } if (res < 0) { + cerr<<"error while sending response packet "<(DOQ_Error_Codes::DOQ_INTERNAL_ERROR)); return true; } @@ -283,6 +285,7 @@ static bool tryWriteResponse(Connection& conn, const uint64_t streamID, PacketBu static void handleResponse(DOQFrontend& frontend, Connection& conn, const uint64_t streamID, PacketBuffer& response) { if (response.empty()) { + cerr<<"sending empty response packet "<(DOQ_Error_Codes::DOQ_UNSPECIFIED_ERROR)); return; @@ -382,6 +385,9 @@ static void processDOQQuery(DOQUnitUniquePtr&& doqUnit) { const auto handleImmediateResponse = [](DOQUnitUniquePtr&& unit, [[maybe_unused]] const char* reason) { DEBUGLOG("handleImmediateResponse() reason=" << reason); + if (reason != std::string("DoQ self-answered response")) { + cerr<<"sending immediate response packet "<dsc->df->d_server_config->d_connections, unit->serverConnID); handleResponse(*unit->dsc->df, *conn, unit->streamID, unit->response); unit->ids.doqu.reset(); @@ -402,6 +408,7 @@ static void processDOQQuery(DOQUnitUniquePtr&& doqUnit) if (unit->query.size() < sizeof(dnsheader)) { ++dnsdist::metrics::g_stats.nonCompliantQueries; + cerr<<"non compliant query of size "<query.size()<<" from "<streamID<response.clear(); @@ -536,6 +543,7 @@ static void doq_dispatch_query(DOQServerConfig& dsc, PacketBuffer&& query, const processDOQQuery(std::move(unit)); } catch (const std::exception& exp) { + cerr<<"error while handling DoQ packet "<(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR)); + return; + } + + streamBuffer.resize(existingLength + received); + if (fin) { + break; + } + } + + if (streamBuffer.size() < (sizeof(uint16_t) + sizeof(dnsheader))) { + cerr<<"non compliant query (early) of size "<(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR)); + return; + } + + uint16_t payloadLength = streamBuffer.at(0) * 256 + streamBuffer.at(1); + streamBuffer.erase(streamBuffer.begin(), streamBuffer.begin() + 2); + if (payloadLength != streamBuffer.size()) { + cerr<<"non compliant query of size "<(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR)); + return; + } + ++conn.d_queries; + DEBUGLOG("Dispatching query"); + doq_dispatch_query(*(frontend.d_server_config), std::move(streamBuffer), clientState.local, client, serverConnID, streamID); + conn.d_streamBuffers.erase(streamID); +} + +static void handleSocketReadable(DOQFrontend& frontend, ClientState& clientState, Socket& sock) +{ + while (true) { + DEBUGLOG("Received datagram"); + std::string bufferStr; + ComboAddress client; + if (!sock.recvFromAsync(bufferStr, client) || bufferStr.size() == 0) { + return; + } + + uint32_t version{0}; + uint8_t type{0}; + std::array scid{}; + size_t scid_len = scid.size(); + std::array dcid{}; + size_t dcid_len = dcid.size(); + std::array token{}; + size_t token_len = token.size(); + + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + auto res = quiche_header_info(reinterpret_cast(bufferStr.data()), bufferStr.size(), LOCAL_CONN_ID_LEN, + &version, &type, + scid.data(), &scid_len, + dcid.data(), &dcid_len, + token.data(), &token_len); + if (res != 0) { + DEBUGLOG("Error in quiche_header_info: " << res); + continue; + } + + // destination connection ID, will have to be sent as original destination connection ID + PacketBuffer serverConnID(dcid.begin(), dcid.begin() + dcid_len); + // source connection ID, will have to be sent as destination connection ID + PacketBuffer clientConnID(scid.begin(), scid.begin() + scid_len); + auto conn = getConnection(frontend.d_server_config->d_connections, serverConnID); + + if (!conn) { + DEBUGLOG("Connection not found"); + if (!quiche_version_is_supported(version)) { + DEBUGLOG("Unsupported version"); + cerr<<"unsupported version"<<(int)version<<" from "<(&client), + client.getSocklen(), + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + reinterpret_cast(&clientState.local), + clientState.local.getSocklen(), + }; + + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + auto done = quiche_conn_recv(conn->get().d_conn.get(), reinterpret_cast(bufferStr.data()), bufferStr.size(), &recv_info); + if (done < 0) { + continue; + } + + if (quiche_conn_is_established(conn->get().d_conn.get()) || quiche_conn_is_in_early_data(conn->get().d_conn.get())) { + auto readable = std::unique_ptr(quiche_conn_readable(conn->get().d_conn.get()), quiche_stream_iter_free); + + flushEgress(sock, conn->get().d_conn, client); + + uint64_t streamID = 0; + while (quiche_stream_iter_next(readable.get(), &streamID)) { + handleReadableStream(frontend, clientState, *conn, streamID, client, serverConnID); + } + } + else { + DEBUGLOG("Connection not established"); + cerr<<"connection from "<udpFD); + sock.setNonBlocking(); auto mplexer = std::unique_ptr(FDMultiplexer::getMultiplexerSilent()); @@ -602,120 +765,7 @@ void doqThread(ClientState* clientState) mplexer->getAvailableFDs(readyFDs, 500); if (std::find(readyFDs.begin(), readyFDs.end(), sock.getHandle()) != readyFDs.end()) { - DEBUGLOG("Received datagram"); - std::string bufferStr; - ComboAddress client; - sock.recvFrom(bufferStr, client); - - uint32_t version{0}; - uint8_t type{0}; - std::array scid{}; - size_t scid_len = scid.size(); - std::array dcid{}; - size_t dcid_len = dcid.size(); - std::array token{}; - size_t token_len = token.size(); - - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - auto res = quiche_header_info(reinterpret_cast(bufferStr.data()), bufferStr.size(), LOCAL_CONN_ID_LEN, - &version, &type, - scid.data(), &scid_len, - dcid.data(), &dcid_len, - token.data(), &token_len); - if (res != 0) { - DEBUGLOG("Error in quiche_header_info: " << res); - continue; - } - - // destination connection ID, will have to be sent as original destination connection ID - PacketBuffer serverConnID(dcid.begin(), dcid.begin() + dcid_len); - // source connection ID, will have to be sent as destination connection ID - PacketBuffer clientConnID(scid.begin(), scid.begin() + scid_len); - auto conn = getConnection(frontend->d_server_config->d_connections, serverConnID); - - if (!conn) { - DEBUGLOG("Connection not found"); - if (!quiche_version_is_supported(version)) { - DEBUGLOG("Unsupported version"); - ++frontend->d_doqUnsupportedVersionErrors; - handleVersionNegociation(sock, clientConnID, serverConnID, client); - continue; - } - - if (token_len == 0) { - /* stateless retry */ - DEBUGLOG("No token received"); - handleStatelessRetry(sock, clientConnID, serverConnID, client, version); - continue; - } - - PacketBuffer tokenBuf(token.begin(), token.begin() + token_len); - auto originalDestinationID = validateToken(tokenBuf, client); - if (!originalDestinationID) { - ++frontend->d_doqInvalidTokensReceived; - DEBUGLOG("Discarding invalid token"); - continue; - } - - DEBUGLOG("Creating a new connection"); - conn = createConnection(*frontend->d_server_config, serverConnID, *originalDestinationID, clientState->local, client); - if (!conn) { - continue; - } - } - DEBUGLOG("Connection found"); - quiche_recv_info recv_info = { - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - reinterpret_cast(&client), - client.getSocklen(), - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - reinterpret_cast(&clientState->local), - clientState->local.getSocklen(), - }; - - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - auto done = quiche_conn_recv(conn->get().d_conn.get(), reinterpret_cast(bufferStr.data()), bufferStr.size(), &recv_info); - if (done < 0) { - continue; - } - - if (quiche_conn_is_established(conn->get().d_conn.get())) { - auto readable = std::unique_ptr(quiche_conn_readable(conn->get().d_conn.get()), quiche_stream_iter_free); - - uint64_t streamID = 0; - while (quiche_stream_iter_next(readable.get(), &streamID)) { - auto& streamBuffer = conn->get().d_streamBuffers[streamID]; - auto existingLength = streamBuffer.size(); - bool fin = false; - streamBuffer.resize(existingLength + 512); - auto received = quiche_conn_stream_recv(conn->get().d_conn.get(), streamID, - &streamBuffer.at(existingLength), 512, - &fin); - streamBuffer.resize(existingLength + received); - if (fin) { - if (streamBuffer.size() < (sizeof(uint16_t) + sizeof(dnsheader))) { - ++dnsdist::metrics::g_stats.nonCompliantQueries; - ++clientState->nonCompliantQueries; - quiche_conn_stream_shutdown(conn->get().d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR)); - break; - } - uint16_t payloadLength = streamBuffer.at(0) * 256 + streamBuffer.at(1); - streamBuffer.erase(streamBuffer.begin(), streamBuffer.begin() + 2); - if (payloadLength != streamBuffer.size()) { - ++dnsdist::metrics::g_stats.nonCompliantQueries; - ++clientState->nonCompliantQueries; - quiche_conn_stream_shutdown(conn->get().d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR)); - break; - } - DEBUGLOG("Dispatching query"); - doq_dispatch_query(*(frontend->d_server_config), std::move(streamBuffer), clientState->local, client, serverConnID, streamID); - conn->get().d_streamBuffers.erase(streamID); - } - } - } - else { - DEBUGLOG("Connection not established"); - } + handleSocketReadable(*frontend, *clientState, sock); } if (std::find(readyFDs.begin(), readyFDs.end(), responseReceiverFD) != readyFDs.end()) { @@ -728,15 +778,15 @@ void doqThread(ClientState* clientState) flushEgress(sock, conn->second.d_conn, conn->second.d_peer); if (quiche_conn_is_closed(conn->second.d_conn.get())) { -#ifdef DEBUGLOG_ENABLED +//#ifdef DEBUGLOG_ENABLED quiche_stats stats; quiche_path_stats path_stats; quiche_conn_stats(conn->second.d_conn.get(), &stats); quiche_conn_path_stats(conn->second.d_conn.get(), 0, &path_stats); - DEBUGLOG("Connection closed, recv=" << stats.recv << " sent=" << stats.sent << " lost=" << stats.lost << " rtt=" << path_stats.rtt << "ns cwnd=" << path_stats.cwnd); -#endif + cerr<<"Connection from "<second.d_peer.toStringWithPort()<<" closed, recv=" << stats.recv << " sent=" << stats.sent << " lost=" << stats.lost << " rtt=" << path_stats.rtt << "ns cwnd=" << path_stats.cwnd<<" queries="<second.d_queries<d_server_config->d_connections.erase(conn); } else { @@ -747,6 +797,7 @@ void doqThread(ClientState* clientState) } } catch (const std::exception& e) { + cerr<<"fatal error"<(bytes)); } - bool recvFromAsync(string &dgram) + bool recvFromAsync(string &dgram, ComboAddress& remote) { - struct sockaddr_in remote; socklen_t remlen = sizeof(remote); ssize_t bytes; d_buffer.resize(s_buflen); @@ -206,8 +205,9 @@ public: //! For datagram sockets, send a datagram to a destination void sendTo(const char* msg, size_t len, const ComboAddress &ep) { - if(sendto(d_socket, msg, len, 0, reinterpret_cast(&ep), ep.getSocklen())<0) + if(sendto(d_socket, msg, len, 0, reinterpret_cast(&ep), ep.getSocklen())<0) { throw NetworkError("After sendto: "+stringerror()); + } } //! For connected datagram sockets, send a datagram