-
Notifications
You must be signed in to change notification settings - Fork 170
/
Copy pathupstream.lua
267 lines (213 loc) · 6.62 KB
/
upstream.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
--- @classmod Upstream
-- Abstracts how to forward traffic to upstream server.
--- @usage
--- local upstream = Upstream.new('http://example.com')
--- upstream:rewrite_request() -- set Host header to 'example.com'
--- -- store itself in `context` table for later use in balancer phase and call `ngx.exec`.
--- upstream:call(context)
local setmetatable = setmetatable
local str_format = string.format
local resty_resolver = require('resty.resolver')
local resty_url = require('resty.url')
local url_helper = require('resty.url_helper')
local http_proxy = require('apicast.http_proxy')
local format = string.format
local cjson = require('cjson')
local _M = {
}
local function proxy_pass(upstream)
local scheme = upstream.uri.scheme
if upstream.uri.scheme == "wss" then
scheme = "https"
end
if upstream.uri.scheme == "ws" then
scheme = "http"
end
return str_format('%s://%s', scheme, upstream.upstream_name)
end
local mt = {
__index = _M
}
--- Create new Upstream instance.
--- @tparam string url
--- @treturn Upstream|nil upstream instance
--- @treturn nil|string error when upstream can't be initialized
--- @static
function _M.new(url)
if not url or url == cjson.null then
return nil, 'Upstream cannot be null'
end
local uri, err = url_helper.parse_url(url)
if err then
return nil, 'invalid upstream'
end
return setmetatable({
uri = uri,
resolver = resty_resolver,
-- @upstream location is defined in apicast.conf
location_name = '@upstream',
-- upstream is defined in upstream.conf
upstream_name = 'upstream',
}, mt)
end
--- Resolve upstream servers.
--- @treturn {...}|nil resolved servers returned by the resolver
--- @treturn nil|string error in case resolving fails
function _M:resolve()
local resolver = self.resolver
local uri = self.uri
if self.servers then
return self.servers
end
if not resolver or not uri then return nil, 'not initialized' end
local res, err = resolver:instance():get_servers(uri.host, uri)
if err then
return nil, err
end
self.servers = res
return res
end
local root_uri = {
['/'] = true,
[''] = true,
}
local function prefix_path(prefix)
local uri = ngx.var.uri or ''
if root_uri[uri] then return prefix end
uri = resty_url.join(prefix, uri)
return uri
end
local function host_header(uri)
local port = uri.port
local default_port = resty_url.default_port(uri.scheme)
if port and port ~= default_port then
return format('%s:%s', uri.host, port)
else
return uri.host
end
end
function _M:use_host_header(host)
self.host = host
end
function _M:set_path(path)
self.uri.path, self.uri.query = url_helper.split_path(path)
end
function _M:append_path(path)
local tmp_path, tmp_query = url_helper.split_path(path)
if not self.uri.path then
self.uri.path = "/"
end
if tmp_path ~= "" then
self.uri.path = resty_url.join(self.uri.path, tmp_path)
end
-- If query is already present, do not need to add more.
if tmp_query and tmp_query ~= "" then
return
end
self.uri.query = tmp_query
end
function _M:update_location(location_name)
if location_name then
self.location_name = location_name
end
end
--- Rewrite request Host header to what is provided in the argument or in the URL.
function _M:rewrite_request()
local _, err = self:set_host_header()
if err then
return nil, 'not initialized'
end
local uri = self.uri
if uri.path then
ngx.req.set_uri(prefix_path(uri.path))
end
if uri.query then
ngx.req.set_uri_args(uri.query)
end
end
local function exec(self)
ngx.var.proxy_pass = proxy_pass(self)
-- the caller can unset the location_name to do own exec/location.capture
if self.location_name then
ngx.exec(self.location_name)
end
end
function _M:set_host_header()
if self.host then
ngx.req.set_header('Host', self.host)
return self.host, nil
end
-- set Host from uri if Host is not defined
local uri = self.uri
if not uri then
return nil, "Upstream URI not initialized"
end
local host = host_header(uri)
ngx.req.set_header('Host', host)
return host, nil
end
function _M:set_skip_https_connect_on_proxy()
self.skip_https_connect = true
end
function _M:set_keepalive_key(context)
if self.uri.scheme == "https" or self.uri.scheme == "wss" then
local key = self.uri.host
local service_id = ((context or {}).service or {}).id or ""
ngx.var.upstream_keepalive_key= key .. "::".. service_id
ngx.log(ngx.DEBUG, "keepalive key for https connection set to: '", ngx.var.upstream_keepalive_key, "'")
end
end
local function get_upstream_location_name(context)
if context.upstream_location_name then
return context.upstream_location_name
end
if context.request_unbuffered then
return "@upstream_request_unbuffered"
end
end
--- Execute the upstream.
--- @tparam table context any table (policy context, ngx.ctx) to store the upstream for later use by balancer
function _M:call(context)
if ngx.headers_sent then return nil, 'response sent already' end
local proxy_uri
-- get_http_proxy is a property set by the http_proxy policy
if context.get_http_proxy then
proxy_uri = context.get_http_proxy(self.uri)
else
proxy_uri = http_proxy.find(self)
end
if proxy_uri then
ngx.log(ngx.DEBUG, 'using proxy: ', proxy_uri)
-- https requests will be terminated, http will be rewritten and sent
-- to a proxy
if context.skip_https_connect_on_proxy then
self:set_skip_https_connect_on_proxy();
end
self.request_unbuffered = context.request_unbuffered
self.upstream_connection_opts = context.upstream_connection_opts
self.upstream_ssl = {
ssl_verify = context.upstream_verify,
ssl_client_cert = context.upstream_certificate,
ssl_client_priv_key = context.upstream_key
}
http_proxy.request(self, proxy_uri)
else
local err = self:rewrite_request()
if err then
ngx.log(ngx.WARN, "Upstream rewrite request failed:", err)
end
end
self:set_keepalive_key(context or {})
if not self.servers then self:resolve() end
local upstream_location_name = get_upstream_location_name(context)
self:update_location(upstream_location_name)
context[self.upstream_name] = self
return exec(self)
end
function _M:set_owner_id(owner_id)
self.owner_id = owner_id
end
function _M:has_owner_id()
return self.owner_id
end
return _M