Skip to content

Commit

Permalink
Add minimal ChainReader interface & types (#196)
Browse files Browse the repository at this point in the history
Co-authored-by: Jordan Krage <[email protected]>
Co-authored-by: Ryan Tinianov <[email protected]>
Co-authored-by: ilija42 <[email protected]>
  • Loading branch information
4 people authored and samsondav committed Jan 11, 2024
1 parent f1aaf35 commit b1b8d22
Show file tree
Hide file tree
Showing 40 changed files with 1,439 additions and 145 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pkg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ jobs:
uses: actions/upload-artifact@v3
with:
name: go-test-results
path: ./pkg_coverage.out
path: ./pkg_coverage.out
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
golang 1.21.1
golang 1.21.4
golangci-lint 1.55.2
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ godoc:

PHONY: install-protoc
install-protoc:
script/install-protoc.sh 24.2 /
script/install-protoc.sh 25.1 /
go install google.golang.org/protobuf/cmd/[email protected]; go install google.golang.org/grpc/cmd/[email protected]

.PHONY: mockery
Expand All @@ -26,4 +26,4 @@ generate: mockery install-protoc
.PHONY: golangci-lint
golangci-lint: ## Run golangci-lint for all issues.
[ -d "./golangci-lint" ] || mkdir ./golangci-lint && \
docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.55.2 golangci-lint run --max-issues-per-linter 0 --max-same-issues 0 > ./golangci-lint/$(shell date +%Y-%m-%d_%H:%M:%S).txt
docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.55.2 golangci-lint run --max-issues-per-linter 0 --max-same-issues 0 > ./golangci-lint/$(shell date +%Y-%m-%d_%H:%M:%S).txt
18 changes: 13 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ module github.com/smartcontractkit/chainlink-common
go 1.21

require (
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/fxamacker/cbor/v2 v2.5.0
github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0
github.com/google/uuid v1.3.1
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0
github.com/hashicorp/go-hclog v1.5.0
github.com/hashicorp/go-plugin v1.5.2
github.com/jmoiron/sqlx v1.3.5
github.com/jpillora/backoff v1.0.0
github.com/linkedin/goavro/v2 v2.12.0
github.com/mitchellh/mapstructure v1.5.0
github.com/mwitkow/grpc-proxy v0.0.0-20230212185441-f345521cb9c9
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.17.0
Expand Down Expand Up @@ -41,6 +44,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb // indirect
Expand All @@ -54,16 +58,17 @@ require (
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.1.1 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.2.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.18.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
Expand All @@ -74,6 +79,9 @@ replace (
// until merged upstream: https://github.com/hashicorp/go-plugin/pull/257
github.com/hashicorp/go-plugin => github.com/smartcontractkit/go-plugin v0.0.0-20231003134350-e49dad63b306

// until merged upstream: https://github.com/mitchellh/mapstructure/pull/343
github.com/mitchellh/mapstructure v1.5.0 => github.com/nolag/mapstructure v1.5.1

// until merged upstream: https://github.com/mwitkow/grpc-proxy/pull/69
github.com/mwitkow/grpc-proxy => github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f
)
155 changes: 64 additions & 91 deletions go.sum

Large diffs are not rendered by default.

132 changes: 132 additions & 0 deletions pkg/loop/internal/chain_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package internal

import (
"context"
jsonv1 "encoding/json"
"fmt"

"github.com/fxamacker/cbor/v2"
jsonv2 "github.com/go-json-experiment/json"

"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/pb"
"github.com/smartcontractkit/chainlink-common/pkg/types"
)

var _ types.ChainReader = (*chainReaderClient)(nil)

type chainReaderClient struct {
*brokerExt
grpc pb.ChainReaderClient
}

// enum of all known encoding formats for versioned data
const (
JSONEncodingVersion1 = iota
JSONEncodingVersion2
CBOREncodingVersion
)

// Version to be used for encoding (version used for decoding is determined by data received)
const CurrentEncodingVersion = CBOREncodingVersion

func encodeVersionedBytes(data any, version int32) (*pb.VersionedBytes, error) {
var bytes []byte
var err error

switch version {
case JSONEncodingVersion1:
bytes, err = jsonv1.Marshal(data)
if err != nil {
return nil, fmt.Errorf("%w: %w", types.ErrInvalidType, err)
}
case JSONEncodingVersion2:
bytes, err = jsonv2.Marshal(data)
if err != nil {
return nil, fmt.Errorf("%w: %w", types.ErrInvalidType, err)
}
case CBOREncodingVersion:
enco := cbor.CoreDetEncOptions()
enco.Time = cbor.TimeRFC3339Nano
var enc cbor.EncMode
enc, err = enco.EncMode()
if err != nil {
return nil, err
}
bytes, err = enc.Marshal(data)
if err != nil {
return nil, fmt.Errorf("%w: %w", types.ErrInvalidType, err)
}
default:
return nil, fmt.Errorf("unsupported encoding version %d for data %v", version, data)
}

return &pb.VersionedBytes{Version: uint32(version), Data: bytes}, nil
}

func decodeVersionedBytes(res any, vData *pb.VersionedBytes) error {
var err error
switch vData.Version {
case JSONEncodingVersion1:
err = jsonv1.Unmarshal(vData.Data, res)
case JSONEncodingVersion2:
err = jsonv2.Unmarshal(vData.Data, res)
case CBOREncodingVersion:
err = cbor.Unmarshal(vData.Data, res)
default:
return fmt.Errorf("unsupported encoding version %d for versionedData %v", vData.Version, vData.Data)
}

if err != nil {
return fmt.Errorf("%w: %w", types.ErrInvalidType, err)
}
return nil
}

func (c *chainReaderClient) GetLatestValue(ctx context.Context, bc types.BoundContract, method string, params, retVal any) error {
versionedParams, err := encodeVersionedBytes(params, CurrentEncodingVersion)
if err != nil {
return err
}

boundContract := pb.BoundContract{Name: bc.Name, Address: bc.Address, Pending: bc.Pending}

reply, err := c.grpc.GetLatestValue(ctx, &pb.GetLatestValueRequest{Bc: &boundContract, Method: method, Params: versionedParams})
if err != nil {
return err
}

return decodeVersionedBytes(retVal, reply.RetVal)
}

var _ pb.ChainReaderServer = (*chainReaderServer)(nil)

type chainReaderServer struct {
pb.UnimplementedChainReaderServer
impl types.ChainReader
}

func (c *chainReaderServer) GetLatestValue(ctx context.Context, request *pb.GetLatestValueRequest) (*pb.GetLatestValueReply, error) {
var bc types.BoundContract
bc.Name = request.Bc.Name[:]
bc.Address = request.Bc.Address[:]
bc.Pending = request.Bc.Pending

params := &map[string]any{}

if err := decodeVersionedBytes(params, request.Params); err != nil {
return nil, err
}

retVal := &map[string]any{}
err := c.impl.GetLatestValue(ctx, bc, request.Method, params, retVal)
if err != nil {
return nil, err
}

encodedRetVal, err := encodeVersionedBytes(retVal, CurrentEncodingVersion)
if err != nil {
return nil, err
}

return &pb.GetLatestValueReply{RetVal: encodedRetVal}, nil
}
Loading

0 comments on commit b1b8d22

Please sign in to comment.