Skip to content

Commit

Permalink
Merge pull request #5888 from oasisprotocol/kostko/feature/rt-client-…
Browse files Browse the repository at this point in the history
…storage

go/runtime/client: Expose state proofs via gRPC
  • Loading branch information
kostko authored Oct 9, 2024
2 parents 082b153 + 8c82d6b commit 807b24d
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 0 deletions.
1 change: 1 addition & 0 deletions .changelog/5888.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/runtime/client: Expose state proofs via gRPC
5 changes: 5 additions & 0 deletions go/runtime/client/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
"github.com/oasisprotocol/oasis-core/go/runtime/bundle/component"
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/syncer"
)

const (
Expand Down Expand Up @@ -85,6 +86,10 @@ type RuntimeClient interface {

// WatchBlocks subscribes to blocks for a specific runtimes.
WatchBlocks(ctx context.Context, runtimeID common.Namespace) (<-chan *roothash.AnnotatedBlock, pubsub.ClosableSubscription, error)

// State returns a MKVS read syncer that can be used to read runtime state from a remote node
// and verify it against the trusted local root.
State() syncer.ReadSyncer
}

// SubmitTxResult is the raw result of submitting a transaction for processing.
Expand Down
123 changes: 123 additions & 0 deletions go/runtime/client/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/syncer"
)

var (
Expand Down Expand Up @@ -43,6 +44,12 @@ var (
methodGetEvents = serviceName.NewMethod("GetEvents", GetEventsRequest{})
// methodQuery is the Query method.
methodQuery = serviceName.NewMethod("Query", QueryRequest{})
// methodStateSyncGet is the StateSyncGet method.
methodStateSyncGet = serviceName.NewMethod("StateSyncGet", syncer.GetRequest{})
// methodStateSyncGetPrefixes is the StateSyncGetPrefixes method.
methodStateSyncGetPrefixes = serviceName.NewMethod("StateSyncGetPrefixes", syncer.GetPrefixesRequest{})
// methodStateSyncIterate is the StateSyncIterate method.
methodStateSyncIterate = serviceName.NewMethod("StateSyncIterate", syncer.IterateRequest{})

// methodWatchBlocks is the WatchBlocks method.
methodWatchBlocks = serviceName.NewMethod("WatchBlocks", common.Namespace{})
Expand Down Expand Up @@ -100,6 +107,18 @@ var (
MethodName: methodQuery.ShortName(),
Handler: handlerQuery,
},
{
MethodName: methodStateSyncGet.ShortName(),
Handler: handlerStateSyncGet,
},
{
MethodName: methodStateSyncGetPrefixes.ShortName(),
Handler: handlerStateSyncGetPrefixes,
},
{
MethodName: methodStateSyncIterate.ShortName(),
Handler: handlerStateSyncIterate,
},
},
Streams: []grpc.StreamDesc{
{
Expand Down Expand Up @@ -422,6 +441,75 @@ func handlerQuery( // nolint: revive
return interceptor(ctx, &rq, info, handler)
}

func handlerStateSyncGet(
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
rq := new(syncer.GetRequest)
if err := dec(rq); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RuntimeClient).State().SyncGet(ctx, rq)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodStateSyncGet.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RuntimeClient).State().SyncGet(ctx, req.(*syncer.GetRequest))
}
return interceptor(ctx, rq, info, handler)
}

func handlerStateSyncGetPrefixes(
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
rq := new(syncer.GetPrefixesRequest)
if err := dec(rq); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RuntimeClient).State().SyncGetPrefixes(ctx, rq)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodStateSyncGetPrefixes.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RuntimeClient).State().SyncGetPrefixes(ctx, req.(*syncer.GetPrefixesRequest))
}
return interceptor(ctx, rq, info, handler)
}

func handlerStateSyncIterate(
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
rq := new(syncer.IterateRequest)
if err := dec(rq); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RuntimeClient).State().SyncIterate(ctx, rq)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodStateSyncIterate.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RuntimeClient).State().SyncIterate(ctx, req.(*syncer.IterateRequest))
}
return interceptor(ctx, rq, info, handler)
}

func handlerWatchBlocks(srv interface{}, stream grpc.ServerStream) error {
var runtimeID common.Namespace
if err := stream.RecvMsg(&runtimeID); err != nil {
Expand Down Expand Up @@ -548,6 +636,41 @@ func (c *runtimeClient) Query(ctx context.Context, request *QueryRequest) (*Quer
return &rsp, nil
}

type stateReadSync struct {
c *runtimeClient
}

// Implements syncer.ReadSyncer.
func (rs *stateReadSync) SyncGet(ctx context.Context, request *syncer.GetRequest) (*syncer.ProofResponse, error) {
var rsp syncer.ProofResponse
if err := rs.c.conn.Invoke(ctx, methodStateSyncGet.FullName(), request, &rsp); err != nil {
return nil, err
}
return &rsp, nil
}

// Implements syncer.ReadSyncer.
func (rs *stateReadSync) SyncGetPrefixes(ctx context.Context, request *syncer.GetPrefixesRequest) (*syncer.ProofResponse, error) {
var rsp syncer.ProofResponse
if err := rs.c.conn.Invoke(ctx, methodStateSyncGetPrefixes.FullName(), request, &rsp); err != nil {
return nil, err
}
return &rsp, nil
}

// Implements syncer.ReadSyncer.
func (rs *stateReadSync) SyncIterate(ctx context.Context, request *syncer.IterateRequest) (*syncer.ProofResponse, error) {
var rsp syncer.ProofResponse
if err := rs.c.conn.Invoke(ctx, methodStateSyncIterate.FullName(), request, &rsp); err != nil {
return nil, err
}
return &rsp, nil
}

func (c *runtimeClient) State() syncer.ReadSyncer {
return &stateReadSync{c}
}

func (c *runtimeClient) WatchBlocks(ctx context.Context, runtimeID common.Namespace) (<-chan *roothash.AnnotatedBlock, pubsub.ClosableSubscription, error) {
ctx, sub := pubsub.NewContextSubscription(ctx)

Expand Down
38 changes: 38 additions & 0 deletions go/worker/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
"github.com/oasisprotocol/oasis-core/go/runtime/client/api"
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry"
"github.com/oasisprotocol/oasis-core/go/runtime/transaction"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/syncer"
"github.com/oasisprotocol/oasis-core/go/worker/client/committee"
)

Expand Down Expand Up @@ -303,3 +305,39 @@ func (s *service) Query(ctx context.Context, request *api.QueryRequest) (*api.Qu
}
return &api.QueryResponse{Data: data}, nil
}

// Implements api.RuntimeClient.
func (s *service) State() syncer.ReadSyncer {
return &storageRouter{r: s.w.commonWorker.RuntimeRegistry}
}

type storageRouter struct {
r runtimeRegistry.Registry
}

// Implements syncer.ReadSyncer.
func (sr *storageRouter) SyncGet(ctx context.Context, request *syncer.GetRequest) (*syncer.ProofResponse, error) {
rt, err := sr.r.GetRuntime(request.Tree.Root.Namespace)
if err != nil {
return nil, err
}
return rt.Storage().SyncGet(ctx, request)
}

// Implements syncer.ReadSyncer.
func (sr *storageRouter) SyncGetPrefixes(ctx context.Context, request *syncer.GetPrefixesRequest) (*syncer.ProofResponse, error) {
rt, err := sr.r.GetRuntime(request.Tree.Root.Namespace)
if err != nil {
return nil, err
}
return rt.Storage().SyncGetPrefixes(ctx, request)
}

// Implements syncer.ReadSyncer.
func (sr *storageRouter) SyncIterate(ctx context.Context, request *syncer.IterateRequest) (*syncer.ProofResponse, error) {
rt, err := sr.r.GetRuntime(request.Tree.Root.Namespace)
if err != nil {
return nil, err
}
return rt.Storage().SyncIterate(ctx, request)
}

0 comments on commit 807b24d

Please sign in to comment.