From 5ca273b1d0137db63b4ee5852d9af1cd3d828338 Mon Sep 17 00:00:00 2001 From: Templum Date: Wed, 3 Mar 2021 17:56:04 +0100 Subject: [PATCH 1/2] :wrench: Trying to address issues with failing integration test --- .github/workflows/ci.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 95073f55..b216b0ac 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -127,5 +127,11 @@ jobs: - name: Await Port listening run: kubectl -n openfaas-fn wait --for=condition=available --timeout=600s deploy/integration + # Sometimes the Integration test failed although port was available, probably Rabbit MQ did not yet finish starting up + - name: Sleep for 30 seconds + uses: jakejarvis/wait-action@master + with: + time: '20s' + - name: Integration Test run: go test --tags=integration ./... \ No newline at end of file From ae0b440dbe2841aafbe3b5623c7a5fee2bfd0e62 Mon Sep 17 00:00:00 2001 From: Simon Date: Mon, 8 Mar 2021 21:32:38 +0100 Subject: [PATCH 2/2] :zap: Change HTTP Client (#87) * :heavy_plus_sign: valyala/fasthttp to leverage as http client * :zap: Using the fasthttp client * Improves Performance and reduces Memory Allocations * :white_check_mark: Adjusted Tests for new http client * :fire: Removed Todo * :white_check_mark: fixed test * Minor --- .vscode/settings.json | 4 +- go.mod | 3 +- go.sum | 28 ++++- main.go | 2 +- main_test.go | 3 +- pkg/openfaas/client.go | 203 ++++++++++++++++++------------------ pkg/openfaas/client_test.go | 34 +++--- pkg/types/http_factory.go | 37 ++++--- 8 files changed, 174 insertions(+), 140 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 2fc7336e..fb80d69d 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,9 +6,11 @@ "faas", "golint", "nolint", + "nosec", "openfaas", "rabbitmq", "streadway", - "stretchr" + "stretchr", + "valyala" ] } \ No newline at end of file diff --git a/go.mod b/go.mod index b1765825..945b2afd 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,8 @@ require ( github.com/streadway/amqp v1.0.0 github.com/stretchr/testify v1.7.0 github.com/testcontainers/testcontainers-go v0.9.0 + github.com/valyala/fasthttp v1.22.0 gopkg.in/yaml.v2 v2.4.0 ) -go 1.13 +go 1.15 diff --git a/go.sum b/go.sum index 1ce05403..3e5a14dd 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ github.com/Microsoft/go-winio v0.4.11 h1:zoIOcVf0xPN1tnMVbTtEdI+P8OofVk3NObnwOQ6 github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= github.com/Microsoft/hcsshim v0.8.6 h1:ZfF0+zZeYdzMIVMZHKtDKJvLHj76XCuVae/jNkjj0IA= github.com/Microsoft/hcsshim v0.8.6/go.mod h1:Op3hHsoHPAvb6lceZHDtd9OkTew38wNoXnJs8iY7rUg= +github.com/andybalholm/brotli v1.0.1 h1:KqhlKozYbRtJvsPrrEeXcO+N2l6NYT5A2QAFmSULpEc= +github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -55,6 +57,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.11.8 h1:difgzQsp5mdAz9v8lm3P/I+EpDKMU/6uTMw1y1FObuo= +github.com/klauspost/compress v1.11.8/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= @@ -108,17 +112,25 @@ github.com/testcontainers/testcontainers-go v0.9.0 h1:ZyftCfROjGrKlxk3MOUn2DAzWr github.com/testcontainers/testcontainers-go v0.9.0/go.mod h1:b22BFXhRbg4PJmeMVWh6ftqjyZHgiIl3w274e9r3C2E= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.22.0 h1:OpwH5KDOJ9cS2bq8fD+KfT4IrksK0llvkHf4MZx42jQ= +github.com/valyala/fasthttp v1.22.0/go.mod h1:0mw2RjXGOzxf4NL2jni3gUQ7LfjjUSiG5sskOUUSEpU= +github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= go.uber.org/goleak v1.1.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g= +golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226101413-39120d07d75e h1:jIQURUJ9mlLvYwTBtRHm9h58rYhSonLvRvgAnP8Nr7I= +golang.org/x/net v0.0.0-20210226101413-39120d07d75e/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= @@ -127,11 +139,19 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20200116001909-b77594299b42 h1:vEOn+mP2zCOVzKckCZy6YsCtDblrpj/w7B9nxGNELpg= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073 h1:8qxJSnu+7dRq6upnbntrmriWByIakBuct5OM/MdQC1M= +golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuArfcOvC4AoJmILihzhDg= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180810170437-e96c4e24768d/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/main.go b/main.go index 8c02f54c..d50abaf7 100644 --- a/main.go +++ b/main.go @@ -43,7 +43,7 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - httpClient := types.MakeHTTPClient(conf.InsecureSkipVerify, 60*time.Second) // TODO: Replace with a more performance implementation & establish client pool + httpClient := types.MakeHTTPClient(conf.InsecureSkipVerify, 60*time.Second) // Setup OpenFaaS Controller which is used for querying and more ofSDK := openfaas.NewController(conf, openfaas.NewClient(httpClient, conf.BasicAuth, conf.GatewayURL), openfaas.NewTopicFunctionCache()) go ofSDK.Start(ctx) diff --git a/main_test.go b/main_test.go index 9398b3a8..b51d0e7e 100644 --- a/main_test.go +++ b/main_test.go @@ -91,11 +91,10 @@ func Test_main(t *testing.T) { assert.GreaterOrEqual(t, before.InvocationCount, float64(0), "should be 0 or more") assert.Contains(t, (*before.Annotations)["topic"], TOPIC, "should listend for TOPIC Foo") - httpClient := types.MakeHTTPClient(false, 5*time.Second) publishedMessages := 0 for i := 0; i < 1000; i++ { - err := publishMessage(*httpClient, TOPIC, "Hello World!") + err := publishMessage(http.Client{}, TOPIC, "Hello World!") if err == nil { publishedMessages += 1 } diff --git a/pkg/openfaas/client.go b/pkg/openfaas/client.go index 5f0dbd7b..2e62baea 100644 --- a/pkg/openfaas/client.go +++ b/pkg/openfaas/client.go @@ -6,15 +6,14 @@ package openfaas import ( - "bytes" "context" + "encoding/base64" "encoding/json" "fmt" - "io" - "io/ioutil" - "net/http" + "log" internal "github.com/Templum/rabbitmq-connector/pkg/types" + "github.com/valyala/fasthttp" "github.com/openfaas/faas-provider/auth" "github.com/openfaas/faas-provider/types" @@ -47,14 +46,14 @@ type FunctionCrawler interface { // Client is used for interacting with Open FaaS type Client struct { - client *http.Client + client *fasthttp.Client credentials *auth.BasicAuthCredentials url string } // NewClient creates a new instance of an OpenFaaS Client using // the provided information -func NewClient(client *http.Client, creds *auth.BasicAuthCredentials, gatewayURL string) *Client { +func NewClient(client *fasthttp.Client, creds *auth.BasicAuthCredentials, gatewayURL string) *Client { return &Client{ client: client, credentials: creds, @@ -63,124 +62,119 @@ func NewClient(client *http.Client, creds *auth.BasicAuthCredentials, gatewayURL } // InvokeSync calls a given function in a synchronous way waiting for the response using the provided payload while considering the provided context -func (c *Client) InvokeSync(ctx context.Context, name string, invocation *internal.OpenFaaSInvocation) ([]byte, error) { // TODO: either reuse provided payload or make it parseable +func (c *Client) InvokeSync(ctx context.Context, name string, invocation *internal.OpenFaaSInvocation) ([]byte, error) { functionURL := fmt.Sprintf("%s/function/%s", c.url, name) + req := fasthttp.AcquireRequest() + resp := fasthttp.AcquireResponse() - var body io.Reader + defer fasthttp.ReleaseRequest(req) + defer fasthttp.ReleaseResponse(resp) + + req.SetRequestURI(functionURL) if invocation.Message != nil { - body = bytes.NewReader(*invocation.Message) + req.SetBody(*invocation.Message) } else { - body = nil + req.SetBody(nil) } - req, _ := http.NewRequestWithContext(ctx, http.MethodPost, functionURL, body) req.Header.Set("Content-Type", invocation.ContentType) req.Header.Set("Content-Encoding", invocation.ContentEncoding) - + req.Header.SetUserAgent("OpenFaaS - Rabbit MQ Connector") if c.credentials != nil { - req.SetBasicAuth(c.credentials.User, c.credentials.Password) + credentials := c.credentials.User + ":" + c.credentials.Password + req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(credentials))) } - if req.Body != nil { - defer req.Body.Close() - } - - res, err := c.client.Do(req) + err := c.client.Do(req, resp) if err != nil { return nil, errors.Wrapf(err, "unable to invoke function %s", name) } - var output []byte - - if res.Body != nil { - defer res.Body.Close() - output, _ = ioutil.ReadAll(res.Body) - } - - switch res.StatusCode { - case 200: - return output, nil - case 401: + switch resp.StatusCode() { + case fasthttp.StatusOK: + return resp.Body(), nil + case fasthttp.StatusUnauthorized: return nil, errors.New("OpenFaaS Credentials are invalid") - case 404: + case fasthttp.StatusNotFound: return nil, errors.New(fmt.Sprintf("Function %s is not deployed", name)) default: - return nil, errors.New(fmt.Sprintf("Received unexpected Status Code %d", res.StatusCode)) + return nil, errors.New(fmt.Sprintf("Received unexpected Status Code %d", resp.StatusCode())) } } // InvokeAsync calls a given function in a asynchronous way waiting for the response using the provided payload while considering the provided context func (c *Client) InvokeAsync(ctx context.Context, name string, invocation *internal.OpenFaaSInvocation) (bool, error) { functionURL := fmt.Sprintf("%s/async-function/%s", c.url, name) + req := fasthttp.AcquireRequest() + resp := fasthttp.AcquireResponse() + + defer fasthttp.ReleaseRequest(req) + defer fasthttp.ReleaseResponse(resp) - var body io.Reader + req.SetRequestURI(functionURL) if invocation.Message != nil { - body = bytes.NewReader(*invocation.Message) + req.SetBody(*invocation.Message) } else { - body = nil + req.SetBody(nil) } - req, _ := http.NewRequestWithContext(ctx, http.MethodPost, functionURL, body) req.Header.Set("Content-Type", invocation.ContentType) req.Header.Set("Content-Encoding", invocation.ContentEncoding) - + req.Header.SetUserAgent("OpenFaaS - Rabbit MQ Connector") if c.credentials != nil { - req.SetBasicAuth(c.credentials.User, c.credentials.Password) + credentials := c.credentials.User + ":" + c.credentials.Password + req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(credentials))) } - if req.Body != nil { - defer req.Body.Close() - } - - res, err := c.client.Do(req) + err := c.client.Do(req, resp) if err != nil { return false, errors.Wrapf(err, "unable to invoke function %s", name) } - if res.Body != nil { - defer res.Body.Close() - } - - switch res.StatusCode { - case 202: + switch resp.StatusCode() { + case fasthttp.StatusAccepted: return true, nil - case 401: + case fasthttp.StatusUnauthorized: return false, errors.New("OpenFaaS Credentials are invalid") - case 404: + case fasthttp.StatusNotFound: return false, errors.New(fmt.Sprintf("Function %s is not deployed", name)) default: - return false, errors.New(fmt.Sprintf("Received unexpected Status Code %d", res.StatusCode)) + return false, errors.New(fmt.Sprintf("Received unexpected Status Code %d", resp.StatusCode())) } } // HasNamespaceSupport Checks if the version of OpenFaaS does support Namespace func (c *Client) HasNamespaceSupport(ctx context.Context) (bool, error) { getNamespaces := fmt.Sprintf("%s/system/namespaces", c.url) + req := fasthttp.AcquireRequest() + resp := fasthttp.AcquireResponse() + + defer fasthttp.ReleaseRequest(req) + defer fasthttp.ReleaseResponse(resp) + + req.SetRequestURI(getNamespaces) - req, _ := http.NewRequestWithContext(ctx, http.MethodGet, getNamespaces, nil) + req.Header.SetUserAgent("OpenFaaS - Rabbit MQ Connector") if c.credentials != nil { - req.SetBasicAuth(c.credentials.User, c.credentials.Password) + credentials := c.credentials.User + ":" + c.credentials.Password + req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(credentials))) } - res, err := c.client.Do(req) + err := c.client.Do(req, resp) if err != nil { return false, errors.Wrapf(err, "unable to determine namespace support") } - if res.Body != nil { - defer res.Body.Close() - } - - switch res.StatusCode { - case 200: - resp, _ := ioutil.ReadAll(res.Body) + switch resp.StatusCode() { + case fasthttp.StatusOK: var namespaces []string - _ = json.Unmarshal(resp, &namespaces) + _ = json.Unmarshal(resp.Body(), &namespaces) // Swarm edition of OF does not support namespaces and is simply returning empty array return len(namespaces) > 0, nil - case 401: + case fasthttp.StatusUnauthorized: return false, errors.New("OpenFaaS Credentials are invalid") default: + log.Println(fmt.Sprintf("Received unexpected Status Code %d while fetching namespaces", resp.StatusCode())) return false, nil } } @@ -188,69 +182,74 @@ func (c *Client) HasNamespaceSupport(ctx context.Context) (bool, error) { // GetNamespaces returns all namespaces where Functions are deployed on func (c *Client) GetNamespaces(ctx context.Context) ([]string, error) { getNamespaces := fmt.Sprintf("%s/system/namespaces", c.url) + req := fasthttp.AcquireRequest() + resp := fasthttp.AcquireResponse() + + defer fasthttp.ReleaseRequest(req) + defer fasthttp.ReleaseResponse(resp) + + req.SetRequestURI(getNamespaces) - req, _ := http.NewRequestWithContext(ctx, http.MethodGet, getNamespaces, nil) + req.Header.SetUserAgent("OpenFaaS - Rabbit MQ Connector") if c.credentials != nil { - req.SetBasicAuth(c.credentials.User, c.credentials.Password) + credentials := c.credentials.User + ":" + c.credentials.Password + req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(credentials))) } - res, err := c.client.Do(req) + err := c.client.Do(req, resp) if err != nil { - return nil, errors.Wrap(err, "unable to obtain namespaces") + return nil, errors.Wrapf(err, "unable to fetch namespaces") } - if res.Body != nil { - defer res.Body.Close() - } - - resp, _ := ioutil.ReadAll(res.Body) - var namespaces []string - err = json.Unmarshal(resp, &namespaces) - - if err != nil { - if res.StatusCode == 401 { - return nil, errors.New("OpenFaaS Credentials are invalid") - } + switch resp.StatusCode() { + case fasthttp.StatusOK: + var namespaces []string + _ = json.Unmarshal(resp.Body(), &namespaces) + // Swarm edition of OF does not support namespaces and is simply returning empty array return namespaces, nil + case fasthttp.StatusUnauthorized: + return nil, errors.New("OpenFaaS Credentials are invalid") + default: + log.Println(fmt.Sprintf("Received unexpected Status Code %d while fetching namespaces", resp.StatusCode())) + return nil, nil } - - return namespaces, nil } // GetFunctions returns a list of all functions in the given namespace or in the default namespace func (c *Client) GetFunctions(ctx context.Context, namespace string) ([]types.FunctionStatus, error) { getFunctions := fmt.Sprintf("%s/system/functions", c.url) + req := fasthttp.AcquireRequest() + resp := fasthttp.AcquireResponse() + + defer fasthttp.ReleaseRequest(req) + defer fasthttp.ReleaseResponse(resp) - req, _ := http.NewRequestWithContext(ctx, http.MethodGet, getFunctions, nil) + req.SetRequestURI(getFunctions) + + req.Header.SetUserAgent("OpenFaaS - Rabbit MQ Connector") if c.credentials != nil { - req.SetBasicAuth(c.credentials.User, c.credentials.Password) + credentials := c.credentials.User + ":" + c.credentials.Password + req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(credentials))) } if len(namespace) > 0 { - q := req.URL.Query() - q.Add("namespace", namespace) - req.URL.RawQuery = q.Encode() + req.URI().QueryArgs().Add("namespace", namespace) } - res, err := c.client.Do(req) + err := c.client.Do(req, resp) if err != nil { return nil, errors.Wrap(err, "unable to obtain functions") } - if res.Body != nil { - defer res.Body.Close() - } - - resp, _ := ioutil.ReadAll(res.Body) - var functions []types.FunctionStatus - err = json.Unmarshal(resp, &functions) - - if err != nil { - if res.StatusCode == 401 { - return nil, errors.New("OpenFaaS Credentials are invalid") - } - return nil, errors.New(fmt.Sprintf("Received unexpected Status Code %d", res.StatusCode)) + switch resp.StatusCode() { + case fasthttp.StatusOK: + var functions []types.FunctionStatus + _ = json.Unmarshal(resp.Body(), &functions) + // Swarm edition of OF does not support namespaces and is simply returning empty array + return functions, nil + case fasthttp.StatusUnauthorized: + return nil, errors.New("OpenFaaS Credentials are invalid") + default: + return nil, errors.New(fmt.Sprintf("Received unexpected Status Code %d", resp.StatusCode())) } - - return functions, nil } diff --git a/pkg/openfaas/client_test.go b/pkg/openfaas/client_test.go index 13f6f73f..13bb83b5 100644 --- a/pkg/openfaas/client_test.go +++ b/pkg/openfaas/client_test.go @@ -12,14 +12,22 @@ import ( "net/http" "net/http/httptest" "testing" + "time" types2 "github.com/Templum/rabbitmq-connector/pkg/types" + "github.com/valyala/fasthttp" "github.com/openfaas/faas-provider/auth" "github.com/openfaas/faas-provider/types" "github.com/stretchr/testify/assert" ) +func CreateClient(server *httptest.Server) *fasthttp.Client { + client := types2.MakeHTTPClient(true, 30*time.Second) + // TODO: For the future configure client with cert pool from the server + return client +} + func TestClient_InvokeSync(t *testing.T) { expectedResponse := "Hello World" @@ -49,9 +57,9 @@ func TestClient_InvokeSync(t *testing.T) { })) defer server.Close() - openfaasClient := NewClient(server.Client(), nil, server.URL) + openfaasClient := NewClient(CreateClient(server), nil, server.URL) - authenticatedOpenFaaSClient := NewClient(server.Client(), &auth.BasicAuthCredentials{ + authenticatedOpenFaaSClient := NewClient(CreateClient(server), &auth.BasicAuthCredentials{ User: "User", Password: "Invalid", }, server.URL) @@ -133,9 +141,9 @@ func TestClient_InvokeAsync(t *testing.T) { })) defer server.Close() - openfaasClient := NewClient(server.Client(), nil, server.URL) + openfaasClient := NewClient(CreateClient(server), nil, server.URL) - authenticatedOpenFaaSClient := NewClient(server.Client(), &auth.BasicAuthCredentials{ + authenticatedOpenFaaSClient := NewClient(CreateClient(server), &auth.BasicAuthCredentials{ User: "User", Password: "Invalid", }, server.URL) @@ -219,16 +227,16 @@ func TestClient_HasNamespaceSupport(t *testing.T) { _, _ = w.Write(out) })) - ofK8SClient := NewClient(k8sOF.Client(), nil, k8sOF.URL) + ofK8SClient := NewClient(CreateClient(k8sOF), nil, k8sOF.URL) - ofSwarmClient := NewClient(swarmOF.Client(), nil, swarmOF.URL) + ofSwarmClient := NewClient(CreateClient(swarmOF), nil, swarmOF.URL) - authenticatedOpenFaaSClient := NewClient(k8sOF.Client(), &auth.BasicAuthCredentials{ + authenticatedOpenFaaSClient := NewClient(CreateClient(k8sOF), &auth.BasicAuthCredentials{ User: "User", Password: "Invalid", }, k8sOF.URL) - failingOpenFaaSClient := NewClient(k8sOF.Client(), &auth.BasicAuthCredentials{ + failingOpenFaaSClient := NewClient(CreateClient(k8sOF), &auth.BasicAuthCredentials{ User: "User", Password: "Pass", }, k8sOF.URL) @@ -336,9 +344,9 @@ func TestClient_GetFunctions(t *testing.T) { })) defer server.Close() - openfaasClient := NewClient(server.Client(), nil, server.URL) + openfaasClient := NewClient(CreateClient(server), nil, server.URL) - authenticatedOpenFaaSClient := NewClient(server.Client(), &auth.BasicAuthCredentials{ + authenticatedOpenFaaSClient := NewClient(CreateClient(server), &auth.BasicAuthCredentials{ User: "User", Password: "Invalid", }, server.URL) @@ -408,14 +416,14 @@ func TestClient_GetNamespaces(t *testing.T) { })) defer server.Close() - openfaasClient := NewClient(server.Client(), nil, server.URL) + openfaasClient := NewClient(CreateClient(server), nil, server.URL) - authenticatedOpenFaaSClient := NewClient(server.Client(), &auth.BasicAuthCredentials{ + authenticatedOpenFaaSClient := NewClient(CreateClient(server), &auth.BasicAuthCredentials{ User: "User", Password: "Invalid", }, server.URL) - failingOpenFaaSClient := NewClient(server.Client(), &auth.BasicAuthCredentials{ + failingOpenFaaSClient := NewClient(CreateClient(server), &auth.BasicAuthCredentials{ User: "User", Password: "Pass", }, server.URL) diff --git a/pkg/types/http_factory.go b/pkg/types/http_factory.go index 191eb7c3..28eeda64 100644 --- a/pkg/types/http_factory.go +++ b/pkg/types/http_factory.go @@ -7,25 +7,30 @@ package types import ( "crypto/tls" - "net" - "net/http" "time" + + "github.com/valyala/fasthttp" + "github.com/valyala/fasthttp/fasthttpproxy" ) // MakeHTTPClient generates an HTTP Client setting basic properties including timeouts -func MakeHTTPClient(insecure bool, timeout time.Duration) *http.Client { - /* #nosec G402 as default is false*/ - return &http.Client{ - Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: timeout, - KeepAlive: 10 * time.Second, - }).DialContext, - MaxIdleConns: 512, - MaxIdleConnsPerHost: 512, - IdleConnTimeout: 120 * time.Millisecond, - TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}, - }, +func MakeHTTPClient(insecure bool, timeout time.Duration) *fasthttp.Client { + client := fasthttp.Client{ + Name: "Main_Client", + + NoDefaultUserAgentHeader: false, + + Dial: fasthttpproxy.FasthttpProxyHTTPDialer(), + + ReadTimeout: timeout, + WriteTimeout: timeout, + + MaxIdleConnDuration: 5 * time.Second, + /* #nosec G402 as default is false*/ + TLSConfig: &tls.Config{InsecureSkipVerify: insecure}, + + MaxConnsPerHost: 256, } + + return &client }