Skip to content

Commit

Permalink
Add keepAliveChecker function
Browse files Browse the repository at this point in the history
  • Loading branch information
bjee19 committed Dec 11, 2024
1 parent 513351e commit a2ace83
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 180 deletions.
8 changes: 4 additions & 4 deletions internal/mode/static/nginx/config/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ func (g GeneratorImpl) executeConfigTemplates(
fileBytes := make(map[string][]byte)

httpUpstreams := g.createUpstreams(conf.Upstreams, upstreamsettings.NewProcessor())
upstreamMap := g.createUpstreamMap(httpUpstreams)
keepAliveCheck := newKeepAliveChecker(httpUpstreams)

for _, execute := range g.getExecuteFuncs(generator, httpUpstreams, upstreamMap) {
for _, execute := range g.getExecuteFuncs(generator, httpUpstreams, keepAliveCheck) {
results := execute(conf)
for _, res := range results {
fileBytes[res.dest] = append(fileBytes[res.dest], res.data...)
Expand Down Expand Up @@ -164,12 +164,12 @@ func (g GeneratorImpl) executeConfigTemplates(
func (g GeneratorImpl) getExecuteFuncs(
generator policies.Generator,
upstreams []http.Upstream,
upstreamMap UpstreamMap,
keepAliveCheck keepAliveChecker,
) []executeFunc {
return []executeFunc{
executeMainConfig,
executeBaseHTTPConfig,
g.newExecuteServersFunc(generator, upstreamMap),
g.newExecuteServersFunc(generator, keepAliveCheck),
newExecuteUpstreamsFunc(upstreams),
executeSplitClients,
executeMaps,
Expand Down
51 changes: 31 additions & 20 deletions internal/mode/static/nginx/config/servers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,21 @@ var httpUpgradeHeader = http.Header{
Value: "$http_upgrade",
}

func (g GeneratorImpl) newExecuteServersFunc(generator policies.Generator, um UpstreamMap) executeFunc {
func (g GeneratorImpl) newExecuteServersFunc(
generator policies.Generator,
keepAliveCheck keepAliveChecker,
) executeFunc {
return func(configuration dataplane.Configuration) []executeResult {
return g.executeServers(configuration, generator, um)
return g.executeServers(configuration, generator, keepAliveCheck)
}
}

func (g GeneratorImpl) executeServers(
conf dataplane.Configuration,
generator policies.Generator,
um UpstreamMap,
keepAliveCheck keepAliveChecker,
) []executeResult {
servers, httpMatchPairs := createServers(conf, generator, um)
servers, httpMatchPairs := createServers(conf, generator, keepAliveCheck)

serverConfig := http.ServerConfig{
Servers: servers,
Expand Down Expand Up @@ -104,7 +107,7 @@ func getIPFamily(baseHTTPConfig dataplane.BaseHTTPConfig) shared.IPFamily {
func createServers(
conf dataplane.Configuration,
generator policies.Generator,
um UpstreamMap,
keepAliveCheck keepAliveChecker,
) ([]http.Server, httpMatchPairs) {
servers := make([]http.Server, 0, len(conf.HTTPServers)+len(conf.SSLServers))
finalMatchPairs := make(httpMatchPairs)
Expand All @@ -116,15 +119,15 @@ func createServers(

for idx, s := range conf.HTTPServers {
serverID := fmt.Sprintf("%d", idx)
httpServer, matchPairs := createServer(s, serverID, generator, um)
httpServer, matchPairs := createServer(s, serverID, generator, keepAliveCheck)
servers = append(servers, httpServer)
maps.Copy(finalMatchPairs, matchPairs)
}

for idx, s := range conf.SSLServers {
serverID := fmt.Sprintf("SSL_%d", idx)

sslServer, matchPairs := createSSLServer(s, serverID, generator, um)
sslServer, matchPairs := createSSLServer(s, serverID, generator, keepAliveCheck)
if _, portInUse := sharedTLSPorts[s.Port]; portInUse {
sslServer.Listen = getSocketNameHTTPS(s.Port)
sslServer.IsSocket = true
Expand All @@ -140,7 +143,7 @@ func createSSLServer(
virtualServer dataplane.VirtualServer,
serverID string,
generator policies.Generator,
um UpstreamMap,
keepAliveCheck keepAliveChecker,
) (http.Server, httpMatchPairs) {
listen := fmt.Sprint(virtualServer.Port)
if virtualServer.IsDefault {
Expand All @@ -150,7 +153,7 @@ func createSSLServer(
}, nil
}

locs, matchPairs, grpc := createLocations(&virtualServer, serverID, generator, um)
locs, matchPairs, grpc := createLocations(&virtualServer, serverID, generator, keepAliveCheck)

server := http.Server{
ServerName: virtualServer.Hostname,
Expand Down Expand Up @@ -179,7 +182,7 @@ func createServer(
virtualServer dataplane.VirtualServer,
serverID string,
generator policies.Generator,
um UpstreamMap,
keepAliveCheck keepAliveChecker,
) (http.Server, httpMatchPairs) {
listen := fmt.Sprint(virtualServer.Port)

Expand All @@ -190,7 +193,7 @@ func createServer(
}, nil
}

locs, matchPairs, grpc := createLocations(&virtualServer, serverID, generator, um)
locs, matchPairs, grpc := createLocations(&virtualServer, serverID, generator, keepAliveCheck)

server := http.Server{
ServerName: virtualServer.Hostname,
Expand Down Expand Up @@ -226,7 +229,7 @@ func createLocations(
server *dataplane.VirtualServer,
serverID string,
generator policies.Generator,
um UpstreamMap,
keepAliveCheck keepAliveChecker,
) ([]http.Location, httpMatchPairs, bool) {
maxLocs, pathsAndTypes := getMaxLocationCountAndPathMap(server.PathRules)
locs := make([]http.Location, 0, maxLocs)
Expand Down Expand Up @@ -255,7 +258,15 @@ func createLocations(

if !needsInternalLocations(rule) {
for _, r := range rule.MatchRules {
extLocations = updateLocations(r.Filters, extLocations, r, server.Port, rule.Path, rule.GRPC, um)
extLocations = updateLocations(
r.Filters,
extLocations,
r,
server.Port,
rule.Path,
rule.GRPC,
keepAliveCheck,
)
}

locs = append(locs, extLocations...)
Expand All @@ -277,7 +288,7 @@ func createLocations(
server.Port,
rule.Path,
rule.GRPC,
um,
keepAliveCheck,
)

internalLocations = append(internalLocations, intLocation)
Expand Down Expand Up @@ -414,7 +425,7 @@ func updateLocation(
listenerPort int32,
path string,
grpc bool,
um UpstreamMap,
keepAliveCheck keepAliveChecker,
) http.Location {
if filters.InvalidFilter != nil {
location.Return = &http.Return{Code: http.StatusInternalServerError}
Expand All @@ -436,7 +447,7 @@ func updateLocation(
extraHeaders = append(extraHeaders, grpcAuthorityHeader)
} else {
extraHeaders = append(extraHeaders, httpUpgradeHeader)
extraHeaders = append(extraHeaders, getConnectionHeader(um, matchRule.BackendGroup.Backends))
extraHeaders = append(extraHeaders, getConnectionHeader(keepAliveCheck, matchRule.BackendGroup.Backends))
}

proxySetHeaders := generateProxySetHeaders(&matchRule.Filters, createBaseProxySetHeaders(extraHeaders...))
Expand Down Expand Up @@ -476,12 +487,12 @@ func updateLocations(
listenerPort int32,
path string,
grpc bool,
um UpstreamMap,
keepAliveCheck keepAliveChecker,
) []http.Location {
updatedLocations := make([]http.Location, len(buildLocations))

for i, loc := range buildLocations {
updatedLocations[i] = updateLocation(filters, loc, matchRule, listenerPort, path, grpc, um)
updatedLocations[i] = updateLocation(filters, loc, matchRule, listenerPort, path, grpc, keepAliveCheck)
}

return updatedLocations
Expand Down Expand Up @@ -890,9 +901,9 @@ func createBaseProxySetHeaders(extraHeaders ...http.Header) []http.Header {
return baseHeaders
}

func getConnectionHeader(um UpstreamMap, backends []dataplane.Backend) http.Header {
func getConnectionHeader(keepAliveCheck keepAliveChecker, backends []dataplane.Backend) http.Header {
for _, backend := range backends {
if um.keepAliveEnabled(backend.UpstreamName) {
if keepAliveCheck(backend.UpstreamName) {
// if keep-alive settings are enabled on any upstream, the connection header value
// must be empty for the location
return unsetHTTPConnectionHeader
Expand Down
Loading

0 comments on commit a2ace83

Please sign in to comment.