Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added readiness support in server health-check #351

Merged
merged 1 commit into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

const (
maxGrpcFrameSize = 256 * 1024 * 1024

ReadinessProbeService = "oxia-readiness"
)

type GrpcServer interface {
Expand Down
11 changes: 11 additions & 0 deletions deploy/charts/oxia-cluster/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,14 @@ exec:
initialDelaySeconds: 10
timeoutSeconds: 10
{{- end }}


{{/*
Probe
*/}}
{{- define "oxia-cluster.readiness-probe" -}}
exec:
command: ["oxia", "health", "--port={{ . }}", "--service=oxia-readiness"]
initialDelaySeconds: 10
timeoutSeconds: 10
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ spec:
livenessProbe:
{{- include "oxia-cluster.probe" .Values.server.ports.internal | nindent 12 }}
readinessProbe:
{{- include "oxia-cluster.probe" .Values.server.ports.internal | nindent 12 }}
{{- include "oxia-cluster.readiness-probe" .Values.server.ports.internal | nindent 12 }}
volumeClaimTemplates:
- metadata:
name: data
Expand Down
17 changes: 13 additions & 4 deletions server/assignment_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
pb "google.golang.org/protobuf/proto"
"io"
"oxia/common"
"oxia/common/container"
"oxia/common/metrics"
"oxia/proto"
"oxia/server/util"
Expand All @@ -50,6 +53,7 @@ type shardAssignmentDispatcher struct {
clients map[int64]chan *proto.ShardAssignments
nextClientId int64
standalone bool
healthServer *health.Server

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -209,6 +213,10 @@ func (s *shardAssignmentDispatcher) PushShardAssignments(stream proto.OxiaCoordi
}

func (s *shardAssignmentDispatcher) updateShardAssignment(assignments *proto.ShardAssignments) error {
// Once we receive the first update of the shards mapping, this service can be
// considered "ready" and it will be able to respond to service discovery requests
s.healthServer.SetServingStatus(container.ReadinessProbeService, grpc_health_v1.HealthCheckResponse_SERVING)

s.Lock()
defer s.Unlock()

Expand All @@ -230,10 +238,11 @@ func (s *shardAssignmentDispatcher) updateShardAssignment(assignments *proto.Sha
return nil
}

func NewShardAssignmentDispatcher() ShardAssignmentsDispatcher {
func NewShardAssignmentDispatcher(healthServer *health.Server) ShardAssignmentsDispatcher {
s := &shardAssignmentDispatcher{
assignments: nil,
clients: make(map[int64]chan *proto.ShardAssignments),
assignments: nil,
healthServer: healthServer,
clients: make(map[int64]chan *proto.ShardAssignments),
log: log.With().
Str("component", "shard-assignment-dispatcher").
Logger(),
Expand All @@ -254,7 +263,7 @@ func NewShardAssignmentDispatcher() ShardAssignmentsDispatcher {
}

func NewStandaloneShardAssignmentDispatcher(numShards uint32) ShardAssignmentsDispatcher {
assignmentDispatcher := NewShardAssignmentDispatcher().(*shardAssignmentDispatcher)
assignmentDispatcher := NewShardAssignmentDispatcher(health.NewServer()).(*shardAssignmentDispatcher)
assignmentDispatcher.standalone = true
res := &proto.ShardAssignments{
Namespaces: map[string]*proto.NamespaceShardsAssignment{
Expand Down
58 changes: 54 additions & 4 deletions server/assignment_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,23 @@
package server

import (
"context"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
"math"
"oxia/common"
"oxia/common/container"
"oxia/proto"
"sync"
"testing"
"time"
)

func TestUninitializedAssignmentDispatcher(t *testing.T) {
dispatcher := NewShardAssignmentDispatcher()
dispatcher := NewShardAssignmentDispatcher(health.NewServer())
mockClient := newMockShardAssignmentClientStream()
assert.False(t, dispatcher.Initialized())
req := &proto.ShardAssignmentsRequest{Namespace: common.DefaultNamespace}
Expand All @@ -35,7 +41,7 @@ func TestUninitializedAssignmentDispatcher(t *testing.T) {
}

func TestShardAssignmentDispatcher_Initialized(t *testing.T) {
dispatcher := NewShardAssignmentDispatcher()
dispatcher := NewShardAssignmentDispatcher(health.NewServer())
coordinatorStream := newMockShardAssignmentControllerStream()
go func() {
err := dispatcher.PushShardAssignments(coordinatorStream)
Expand Down Expand Up @@ -76,12 +82,56 @@ func TestShardAssignmentDispatcher_Initialized(t *testing.T) {

}

func TestShardAssignmentDispatcher_ReadinessProbe(t *testing.T) {
healthServer := health.NewServer()
dispatcher := NewShardAssignmentDispatcher(healthServer)
coordinatorStream := newMockShardAssignmentControllerStream()
go func() {
err := dispatcher.PushShardAssignments(coordinatorStream)
assert.NoError(t, err)
}()

assert.False(t, dispatcher.Initialized())

// Readiness probe should fail while not initialized
resp, err := healthServer.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{
Service: container.ReadinessProbeService,
})

assert.Equal(t, codes.NotFound, status.Code(err))
assert.Nil(t, resp)

coordinatorStream.AddRequest(&proto.ShardAssignments{
Namespaces: map[string]*proto.NamespaceShardsAssignment{
common.DefaultNamespace: {
Assignments: []*proto.ShardAssignment{
newShardAssignment(0, "server1", 0, 100),
newShardAssignment(1, "server2", 100, math.MaxUint32),
},
ShardKeyRouter: proto.ShardKeyRouter_XXHASH3,
},
},
})
assert.Eventually(t, func() bool {
return dispatcher.Initialized()
}, 10*time.Second, 10*time.Millisecond)

resp, err = healthServer.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{
Service: container.ReadinessProbeService,
})
assert.NoError(t, err)
assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, resp.Status)

assert.NoError(t, dispatcher.Close())

}

func TestShardAssignmentDispatcher_AddClient(t *testing.T) {
shard0InitialAssignment := newShardAssignment(0, "server1", 0, 100)
shard1InitialAssignment := newShardAssignment(1, "server2", 100, math.MaxUint32)
shard1UpdatedAssignment := newShardAssignment(1, "server3", 100, math.MaxUint32)

dispatcher := NewShardAssignmentDispatcher()
dispatcher := NewShardAssignmentDispatcher(health.NewServer())

coordinatorStream := newMockShardAssignmentControllerStream()
go func() {
Expand Down Expand Up @@ -163,7 +213,7 @@ func TestShardAssignmentDispatcher_AddClient(t *testing.T) {
}

func TestShardAssignmentDispatcher_MultipleNamespaces(t *testing.T) {
dispatcher := NewShardAssignmentDispatcher()
dispatcher := NewShardAssignmentDispatcher(health.NewServer())

coordinatorStream := newMockShardAssignmentControllerStream()
go func() {
Expand Down
7 changes: 4 additions & 3 deletions server/internal_rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ type internalRpcServer struct {
log zerolog.Logger
}

func newInternalRpcServer(grpcProvider container.GrpcProvider, bindAddress string, shardsDirector ShardsDirector, assignmentDispatcher ShardAssignmentsDispatcher) (*internalRpcServer, error) {
func newInternalRpcServer(grpcProvider container.GrpcProvider, bindAddress string, shardsDirector ShardsDirector,
assignmentDispatcher ShardAssignmentsDispatcher, healthServer *health.Server) (*internalRpcServer, error) {
server := &internalRpcServer{
shardsDirector: shardsDirector,
assignmentDispatcher: assignmentDispatcher,
healthServer: health.NewServer(),
healthServer: healthServer,
log: log.With().
Str("component", "internal-rpc-server").
Logger(),
Expand All @@ -67,7 +68,6 @@ func newInternalRpcServer(grpcProvider container.GrpcProvider, bindAddress strin
}

func (s *internalRpcServer) Close() error {
s.healthServer.Shutdown()
return s.grpcServer.Close()
}

Expand All @@ -82,6 +82,7 @@ func (s *internalRpcServer) PushShardAssignments(srv proto.OxiaCoordination_Push
Str("peer", common.GetPeer(srv.Context())).
Msg("Failed to provide shards assignments updates")
}

return err
}

Expand Down
5 changes: 4 additions & 1 deletion server/internal_rpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ import (
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"oxia/common/container"
"testing"
)

func TestInternalHealthCheck(t *testing.T) {
server, err := newInternalRpcServer(container.Default, "localhost:0", nil, NewShardAssignmentDispatcher())
healthServer := health.NewServer()
server, err := newInternalRpcServer(container.Default, "localhost:0", nil,
NewShardAssignmentDispatcher(healthServer), healthServer)
assert.NoError(t, err)

target := fmt.Sprintf("localhost:%d", server.grpcServer.Port())
Expand Down
12 changes: 9 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"github.com/rs/zerolog/log"
"go.uber.org/multierr"
"google.golang.org/grpc/health"
"oxia/common/container"
"oxia/common/metrics"
"oxia/server/kv"
Expand Down Expand Up @@ -45,6 +46,8 @@ type Server struct {
metrics *metrics.PrometheusMetrics
walFactory wal.WalFactory
kvFactory kv.KVFactory

healthServer *health.Server
}

func New(config Config) (*Server, error) {
Expand All @@ -69,14 +72,15 @@ func NewWithGrpcProvider(config Config, provider container.GrpcProvider, replica
walFactory: wal.NewWalFactory(&wal.WalFactoryOptions{
LogDir: config.WalDir,
}),
kvFactory: kvFactory,
kvFactory: kvFactory,
healthServer: health.NewServer(),
}

s.shardsDirector = NewShardsDirector(config, s.walFactory, s.kvFactory, replicationRpcProvider)
s.shardAssignmentDispatcher = NewShardAssignmentDispatcher()
s.shardAssignmentDispatcher = NewShardAssignmentDispatcher(s.healthServer)

s.internalRpcServer, err = newInternalRpcServer(provider, config.InternalServiceAddr,
s.shardsDirector, s.shardAssignmentDispatcher)
s.shardsDirector, s.shardAssignmentDispatcher, s.healthServer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -105,6 +109,8 @@ func (s *Server) InternalPort() int {
}

func (s *Server) Close() error {
s.healthServer.Shutdown()

err := multierr.Combine(
s.shardAssignmentDispatcher.Close(),
s.shardsDirector.Close(),
Expand Down