Skip to content

Commit

Permalink
feat!: new proto (flagd.sync.v1) for sync sources (open-feature#1214)
Browse files Browse the repository at this point in the history
## This PR
- update mockgen
- update proto in core
- update to go 1.21

---------

Signed-off-by: RealAnna <[email protected]>
Signed-off-by: Todd Baert <[email protected]>
Signed-off-by: Kavindu Dodanduwa <[email protected]>
Signed-off-by: RealAnna <[email protected]>
Co-authored-by: Todd Baert <[email protected]>
Co-authored-by: Kavindu Dodanduwa <[email protected]>
  • Loading branch information
3 people authored Feb 20, 2024
1 parent 4add9fd commit 544234e
Show file tree
Hide file tree
Showing 16 changed files with 122 additions and 675 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ on:
- "docs/**"

env:
GO_VERSION: '1.20'
GO_VERSION: '~1.21'

jobs:
lint:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release-please.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ env:
PUBLISHABLE_ITEMS: '["flagd","flagd-proxy"]'
REGISTRY: ghcr.io
REPO_OWNER: ${{ github.repository_owner }}
DEFAULT_GO_VERSION: '1.20'
DEFAULT_GO_VERSION: '~1.21'
PUBLIC_KEY_FILE: publicKey.pub
GOPRIVATE: buf.build/gen/go

Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ lint:
go install -v github.com/golangci/golangci-lint/cmd/[email protected]
$(foreach module, $(ALL_GO_MOD_DIRS), ${GOPATH}/bin/golangci-lint run --deadline=5m --timeout=5m $(module)/... || exit;)
install-mockgen:
go install github.com/golang/mock/mockgen@v1.6.0
go install go.uber.org/mock/mockgen@v0.4.0
mockgen: install-mockgen
cd core; mockgen -source=pkg/sync/http/http_sync.go -destination=pkg/sync/http/mock/http.go -package=syncmock
cd core; mockgen -source=pkg/sync/grpc/grpc_sync.go -destination=pkg/sync/grpc/mock/grpc.go -package=grpcmock
cd core; mockgen -source=pkg/sync/grpc/credentials/builder.go -destination=pkg/sync/grpc/credentials/mock/builder.go -package=credendialsmock
cd core; mockgen -source=pkg/eval/ievaluator.go -destination=pkg/eval/mock/ievaluator.go -package=evalmock
cd core; mockgen -source=pkg/evaluator/ievaluator.go -destination=pkg/eval/mock/ievaluator.go -package=evalmock
cd core; mockgen -source=pkg/service/middleware/interface.go -destination=pkg/service/middleware/mock/interface.go -package=middlewaremock
cd core; mockgen -source=pkg/sync/builder/syncbuilder.go -destination=pkg/sync/builder/mock/syncbuilder.go -package=middlewaremocksyncbuildermock
generate-docs:
Expand Down
6 changes: 4 additions & 2 deletions core/go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
module github.com/open-feature/flagd/core

go 1.20
go 1.21

toolchain go1.21.4

require (
buf.build/gen/go/open-feature/flagd/connectrpc/go v1.15.0-20240215170432-1e611e2999cc.1
buf.build/gen/go/open-feature/flagd/grpc/go v1.3.0-20231031123731-ac2ec0f39838.2
buf.build/gen/go/open-feature/flagd/grpc/go v1.3.0-20240215170432-1e611e2999cc.2
buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.32.0-20240215170432-1e611e2999cc.1
connectrpc.com/connect v1.15.0
connectrpc.com/otelconnect v0.7.0
Expand Down
71 changes: 12 additions & 59 deletions core/go.sum

Large diffs are not rendered by default.

50 changes: 8 additions & 42 deletions core/pkg/sync/grpc/grpc_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
msync "sync"
"time"

"buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc"
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1"
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
grpccredential "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials"
Expand Down Expand Up @@ -168,7 +168,6 @@ func (g *Sync) connectWithRetry(

// handleFlagSync wraps the stream listening and push updates through dataSync channel
func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync) error {
// Set ready state once only
once.Do(func() {
g.ready = true
})
Expand All @@ -179,45 +178,12 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient,
return fmt.Errorf("error receiving payload from stream: %w", err)
}

switch data.State {
case v1.SyncState_SYNC_STATE_ALL:
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.URI,
Type: sync.ALL,
}

g.Logger.Debug("received full configuration payload")
case v1.SyncState_SYNC_STATE_ADD:
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.URI,
Type: sync.ADD,
}

g.Logger.Debug("received an add payload")
case v1.SyncState_SYNC_STATE_UPDATE:
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.URI,
Type: sync.UPDATE,
}

g.Logger.Debug("received an update payload")
case v1.SyncState_SYNC_STATE_DELETE:
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.URI,
Type: sync.DELETE,
}

g.Logger.Debug("received a delete payload")
case v1.SyncState_SYNC_STATE_PING:
g.Logger.Debug("received server ping")
case v1.SyncState_SYNC_STATE_UNSPECIFIED:
g.Logger.Debug("received unspecified state")
default:
g.Logger.Debug(fmt.Sprintf("received unknown state: %s", data.State.String()))
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.URI,
Type: sync.ALL,
}

g.Logger.Debug("received full configuration payload")
}
}
121 changes: 6 additions & 115 deletions core/pkg/sync/grpc/grpc_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"testing"
"time"

"buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc"
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1"
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
"github.com/golang/mock/gomock"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
Expand Down Expand Up @@ -194,7 +194,6 @@ func TestSync_BasicFlagSyncStates(t *testing.T) {
clientResponse.EXPECT().Recv().Return(
&v1.SyncFlagsResponse{
FlagConfiguration: "{}",
State: v1.SyncState_SYNC_STATE_ALL,
},
nil,
),
Expand All @@ -206,66 +205,6 @@ func TestSync_BasicFlagSyncStates(t *testing.T) {
want: sync.ALL,
ready: true,
},
{
name: "State Add maps to Sync Add",
setup: func(t *testing.T, client *grpcmock.MockFlagSyncServiceClient, clientResponse *grpcmock.MockFlagSyncServiceClientResponse) {
client.EXPECT().SyncFlags(gomock.Any(), gomock.Any(), gomock.Any()).Return(clientResponse, nil)
gomock.InOrder(
clientResponse.EXPECT().Recv().Return(
&v1.SyncFlagsResponse{
FlagConfiguration: "{}",
State: v1.SyncState_SYNC_STATE_ADD,
},
nil,
),
clientResponse.EXPECT().Recv().Return(
nil, io.EOF,
),
)
},
want: sync.ADD,
ready: true,
},
{
name: "State Update maps to Sync Update",
setup: func(t *testing.T, client *grpcmock.MockFlagSyncServiceClient, clientResponse *grpcmock.MockFlagSyncServiceClientResponse) {
client.EXPECT().SyncFlags(gomock.Any(), gomock.Any(), gomock.Any()).Return(clientResponse, nil)
gomock.InOrder(
clientResponse.EXPECT().Recv().Return(
&v1.SyncFlagsResponse{
FlagConfiguration: "{}",
State: v1.SyncState_SYNC_STATE_UPDATE,
},
nil,
),
clientResponse.EXPECT().Recv().Return(
nil, io.EOF,
),
)
},
want: sync.UPDATE,
ready: true,
},
{
name: "State Delete maps to Sync Delete",
setup: func(t *testing.T, client *grpcmock.MockFlagSyncServiceClient, clientResponse *grpcmock.MockFlagSyncServiceClientResponse) {
client.EXPECT().SyncFlags(gomock.Any(), gomock.Any(), gomock.Any()).Return(clientResponse, nil)
gomock.InOrder(
clientResponse.EXPECT().Recv().Return(
&v1.SyncFlagsResponse{
FlagConfiguration: "{}",
State: v1.SyncState_SYNC_STATE_DELETE,
},
nil,
),
clientResponse.EXPECT().Recv().Return(
nil, io.EOF,
),
)
},
want: sync.DELETE,
ready: true,
},
{
name: "Error during flag sync",
setup: func(t *testing.T, client *grpcmock.MockFlagSyncServiceClient, clientResponse *grpcmock.MockFlagSyncServiceClientResponse) {
Expand Down Expand Up @@ -332,7 +271,6 @@ func Test_StreamListener(t *testing.T) {
input: []serverPayload{
{
flags: "{\"flags\": {}}",
state: v1.SyncState_SYNC_STATE_ALL,
},
},
output: []sync.DataSync{
Expand All @@ -347,64 +285,16 @@ func Test_StreamListener(t *testing.T) {
input: []serverPayload{
{
flags: "{}",
state: v1.SyncState_SYNC_STATE_ALL,
},
{
flags: "{\"flags\": {}}",
state: v1.SyncState_SYNC_STATE_DELETE,
},
},
output: []sync.DataSync{
{
FlagData: "{}",
Type: sync.ALL,
},
{
FlagData: "{\"flags\": {}}",
Type: sync.DELETE,
},
},
},
{
name: "Pings are ignored & not written to channel",
input: []serverPayload{
{
flags: "",
state: v1.SyncState_SYNC_STATE_PING,
},
{
flags: "",
state: v1.SyncState_SYNC_STATE_PING,
},
{
flags: "{\"flags\": {}}",
state: v1.SyncState_SYNC_STATE_DELETE,
},
},
output: []sync.DataSync{
{
FlagData: "{\"flags\": {}}",
Type: sync.DELETE,
},
},
},
{
name: "Unknown states are & not written to channel",
input: []serverPayload{
{
flags: "",
state: 42,
},
{
flags: "",
state: -1,
},
{
flags: "{\"flags\": {}}",
state: v1.SyncState_SYNC_STATE_ALL,
},
},
output: []sync.DataSync{
{
FlagData: "{\"flags\": {}}",
Type: sync.ALL,
Expand Down Expand Up @@ -557,7 +447,6 @@ func Test_SyncRetry(t *testing.T) {
bServer := bufferedServer{listener: bufListener, mockResponses: []serverPayload{
{
flags: "{}",
state: v1.SyncState_SYNC_STATE_ALL,
},
}}

Expand Down Expand Up @@ -648,7 +537,6 @@ func serve(bServer *bufferedServer) {

type serverPayload struct {
flags string
state v1.SyncState
}

// bufferedServer - a mock grpc service backed by buffered connection
Expand All @@ -663,7 +551,6 @@ func (b *bufferedServer) SyncFlags(_ *v1.SyncFlagsRequest, stream syncv1grpc.Fla
for _, response := range b.mockResponses {
err := stream.Send(&v1.SyncFlagsResponse{
FlagConfiguration: response.flags,
State: response.state,
})
if err != nil {
fmt.Printf("Error with stream: %s", err.Error())
Expand All @@ -677,3 +564,7 @@ func (b *bufferedServer) SyncFlags(_ *v1.SyncFlagsRequest, stream syncv1grpc.Fla
func (b *bufferedServer) FetchAllFlags(_ context.Context, _ *v1.FetchAllFlagsRequest) (*v1.FetchAllFlagsResponse, error) {
return b.fetchAllFlagsResponse, b.fetchAllFlagsError
}

func (b *bufferedServer) GetMetadata(_ context.Context, _ *v1.GetMetadataRequest) (*v1.GetMetadataResponse, error) {
return &v1.GetMetadataResponse{}, nil
}
24 changes: 22 additions & 2 deletions core/pkg/sync/grpc/mock/grpc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 544234e

Please sign in to comment.