From 927d58fe43da1d0c28a413004a3bc67567051396 Mon Sep 17 00:00:00 2001 From: armstrmi Date: Tue, 21 Dec 2021 15:31:01 -0500 Subject: [PATCH 1/3] updated splitting algorithm for google cloud output operator --- .../builtin/output/googlecloud/request.go | 25 +++++++++++++---- .../output/googlecloud/request_test.go | 28 +++++++++++++++++++ 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/operator/builtin/output/googlecloud/request.go b/operator/builtin/output/googlecloud/request.go index 1ed64bbdf..885a8a929 100644 --- a/operator/builtin/output/googlecloud/request.go +++ b/operator/builtin/output/googlecloud/request.go @@ -53,13 +53,26 @@ func (g *GoogleRequestBuilder) buildRequests(entries []*logging.LogEntry) []*log } totalEntries := len(request.Entries) - midPoint := totalEntries / 2 - leftEntries := request.Entries[0:midPoint] - rightEntries := request.Entries[midPoint:totalEntries] + firstRequest := g.buildRequest([]*logging.LogEntry{}) + firstSize := 0 + index := 0 - leftRequests := g.buildRequests(leftEntries) - rightRequests := g.buildRequests(rightEntries) - return append(leftRequests, rightRequests...) + for i, entry := range request.Entries { + + firstRequest.Entries = append(firstRequest.Entries, entry) + firstSize = proto.Size(firstRequest) + + if firstSize > g.MaxRequestSize { + index = i + firstRequest.Entries = firstRequest.Entries[0:index] + break + } + } + + secondEntries := request.Entries[index:totalEntries] + secondRequests := g.buildRequests(secondEntries) + + return append([]*logging.WriteLogEntriesRequest{firstRequest}, secondRequests...) } // buildRequest builds a request from the supplied entries diff --git a/operator/builtin/output/googlecloud/request_test.go b/operator/builtin/output/googlecloud/request_test.go index 4f2cdfee6..e22cfa090 100644 --- a/operator/builtin/output/googlecloud/request_test.go +++ b/operator/builtin/output/googlecloud/request_test.go @@ -2,6 +2,7 @@ package googlecloud import ( "errors" + "fmt" "testing" "github.com/observiq/stanza/entry" @@ -11,6 +12,27 @@ import ( "google.golang.org/genproto/googleapis/logging/v2" ) +func BenchmarkBuildRequest(b *testing.B) { + entryBuilder := &MockEntryBuilder{} + entries := []*entry.Entry{} + + for i := 0; i < 1000; i++ { + entry, result := createEntry(i) + entryBuilder.On("Build", entry).Return(result, nil) + entries = append(entries, entry) + } + + requestBuilder := GoogleRequestBuilder{ + MaxRequestSize: 10000, + ProjectID: "test_project", + EntryBuilder: entryBuilder, + SugaredLogger: zap.NewNop().Sugar(), + } + + requests := requestBuilder.Build(entries) + require.Len(b, requests, 2) +} + func TestBuildRequest(t *testing.T) { entryOne := &entry.Entry{Record: "request 1"} entryTwo := &entry.Entry{Record: "request 2"} @@ -90,3 +112,9 @@ func (_m *MockEntryBuilder) Build(_a0 *entry.Entry) (*logging.LogEntry, error) { return r0, r1 } + +func createEntry(num int) (*entry.Entry, *logging.LogEntry) { + entry := &entry.Entry{Record: fmt.Sprintf("request %d", num)} + result := &logging.LogEntry{Payload: &logging.LogEntry_TextPayload{TextPayload: fmt.Sprintf("request %d", num)}} + return entry, result +} From b19c314480da2524e450ecac255058fdb8cfe582 Mon Sep 17 00:00:00 2001 From: armstrmi Date: Mon, 3 Jan 2022 10:00:31 -0500 Subject: [PATCH 2/3] updatewd benchmark --- operator/builtin/output/googlecloud/request_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/operator/builtin/output/googlecloud/request_test.go b/operator/builtin/output/googlecloud/request_test.go index e22cfa090..de1401d80 100644 --- a/operator/builtin/output/googlecloud/request_test.go +++ b/operator/builtin/output/googlecloud/request_test.go @@ -13,12 +13,14 @@ import ( ) func BenchmarkBuildRequest(b *testing.B) { - entryBuilder := &MockEntryBuilder{} + entryBuilder := &GoogleEntryBuilder{ + MaxEntrySize: defaultMaxEntrySize, + ProjectID: "project", + } entries := []*entry.Entry{} for i := 0; i < 1000; i++ { - entry, result := createEntry(i) - entryBuilder.On("Build", entry).Return(result, nil) + entry, _ := createEntry(i) entries = append(entries, entry) } @@ -30,7 +32,7 @@ func BenchmarkBuildRequest(b *testing.B) { } requests := requestBuilder.Build(entries) - require.Len(b, requests, 2) + require.Len(b, requests, 3) } func TestBuildRequest(t *testing.T) { From 2160416b0793104559cc3769b34d275962e07a76 Mon Sep 17 00:00:00 2001 From: armstrmi Date: Mon, 3 Jan 2022 10:23:18 -0500 Subject: [PATCH 3/3] changed append expression to exclude beginning 0 --- operator/builtin/output/googlecloud/request.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator/builtin/output/googlecloud/request.go b/operator/builtin/output/googlecloud/request.go index 885a8a929..1684d44f7 100644 --- a/operator/builtin/output/googlecloud/request.go +++ b/operator/builtin/output/googlecloud/request.go @@ -64,7 +64,7 @@ func (g *GoogleRequestBuilder) buildRequests(entries []*logging.LogEntry) []*log if firstSize > g.MaxRequestSize { index = i - firstRequest.Entries = firstRequest.Entries[0:index] + firstRequest.Entries = firstRequest.Entries[:index] break } }