diff --git a/src/constants/errors.go b/src/constants/errors.go index 04bccafc..92c7443e 100644 --- a/src/constants/errors.go +++ b/src/constants/errors.go @@ -3,7 +3,6 @@ package constants import "errors" var ( - ErrInvalidPacket = errors.New("packet.invalid") ErrCantSendPacket = errors.New("socket.unreachable") ErrInvalidKosk = errors.New("kosk.invalid") ErrInvalidConfig = errors.New("conf.invalid") diff --git a/src/consumers/consumers.go b/src/consumers/consumers.go index 61978c2e..d4bd9a93 100644 --- a/src/consumers/consumers.go +++ b/src/consumers/consumers.go @@ -10,31 +10,14 @@ import ( "github.com/KenshiTech/unchained/plugins/logs" "github.com/KenshiTech/unchained/plugins/uniswap" "github.com/gorilla/websocket" - "github.com/vmihailenco/msgpack/v5" + + sia "github.com/pouya-eghbali/go-sia/v2/pkg" ) // TODO: These functions share a huge chunk of code func ConsumePriceReport(message []byte) { - var packet datasets.BroadcastPricePacket - err := msgpack.Unmarshal(message, &packet) - if err != nil { - log.Logger. - With("Error", err). - Error("Unmarshal error") - - return - } - - toHash, err := msgpack.Marshal(&packet.Info) - - if err != nil { - log.Logger. - With("Error", err). - Error("Marshal error") - - return - } - + packet := new(datasets.BroadcastPricePacket).DeSia(&sia.Sia{Content: message}) + toHash := packet.Info.Sia().Content hash, err := bls.Hash(toHash) if err != nil { @@ -66,27 +49,8 @@ func ConsumePriceReport(message []byte) { } func ConsumeEventLog(message []byte) { - var packet datasets.BroadcastEventPacket - err := msgpack.Unmarshal(message, &packet) - - if err != nil { - log.Logger. - With("Error", err). - Error("Unmarshal error") - - return - } - - toHash, err := msgpack.Marshal(&packet.Info) - - if err != nil { - log.Logger. - With("Error", err). - Error("Marshal error") - - return - } - + packet := new(datasets.BroadcastEventPacket).DeSia(&sia.Sia{Content: message}) + toHash := packet.Info.Sia().Content hash, err := bls.Hash(toHash) if err != nil { @@ -118,27 +82,8 @@ func ConsumeEventLog(message []byte) { } func ConsumeCorrectnessReport(message []byte) { - var packet datasets.BroadcastCorrectnessPacket - err := msgpack.Unmarshal(message, &packet) - - if err != nil { - log.Logger. - With("Error", err). - Error("Unmarshal error") - - return - } - - toHash, err := msgpack.Marshal(&packet.Info) - - if err != nil { - log.Logger. - With("Error", err). - Error("Marshal error") - - return - } - + packet := new(datasets.BroadcastCorrectnessPacket).DeSia(&sia.Sia{Content: message}) + toHash := packet.Info.Sia().Content hash, err := bls.Hash(toHash) if err != nil { diff --git a/src/crypto/bls/bls.go b/src/crypto/bls/bls.go index 47810688..e5b6b4d4 100644 --- a/src/crypto/bls/bls.go +++ b/src/crypto/bls/bls.go @@ -8,19 +8,6 @@ import ( bls12381_fr "github.com/consensys/gnark-crypto/ecc/bls12-381/fr" ) -type Signer struct { - Name string - EvmWallet string - PublicKey [96]byte - ShortPublicKey [48]byte -} - -type Signature struct { - Signature bls12381.G1Affine - Signer Signer - Processed bool -} - var ( g2Aff bls12381.G2Affine g1Aff bls12381.G1Affine diff --git a/src/crypto/bls/store.go b/src/crypto/bls/store.go index 4f1d3ae7..a91c324f 100644 --- a/src/crypto/bls/store.go +++ b/src/crypto/bls/store.go @@ -6,6 +6,7 @@ import ( "github.com/KenshiTech/unchained/address" "github.com/KenshiTech/unchained/config" + "github.com/KenshiTech/unchained/datasets" "github.com/KenshiTech/unchained/log" "github.com/btcsuite/btcutil/base58" @@ -16,7 +17,7 @@ import ( var ClientSecretKey *big.Int var ClientPublicKey *bls12381.G2Affine var ClientShortPublicKey *bls12381.G1Affine -var ClientSigner Signer +var ClientSigner datasets.Signer func saveConfig() { pkBytes := ClientPublicKey.Bytes() @@ -66,7 +67,7 @@ func InitClientIdentity() { pkBytes := ClientPublicKey.Bytes() addrStr := address.Calculate(pkBytes[:]) - ClientSigner = Signer{ + ClientSigner = datasets.Signer{ Name: config.Config.GetString("name"), EvmWallet: config.Secrets.GetString("evmwallet"), PublicKey: ClientPublicKey.Bytes(), diff --git a/src/datasets/bls.go b/src/datasets/bls.go new file mode 100644 index 00000000..552da7a6 --- /dev/null +++ b/src/datasets/bls.go @@ -0,0 +1,56 @@ +package datasets + +import ( + bls12381 "github.com/consensys/gnark-crypto/ecc/bls12-381" + sia "github.com/pouya-eghbali/go-sia/v2/pkg" +) + +type Signer struct { + Name string + EvmWallet string + PublicKey [96]byte + ShortPublicKey [48]byte +} + +type Signature struct { + Signature bls12381.G1Affine + Signer Signer + Processed bool +} + +func (s *Signature) Sia() *sia.Sia { + return new(sia.Sia). + AddByteArray8(s.Signature.Marshal()). + EmbedSia(s.Signer.Sia()). + AddBool(s.Processed) +} + +func (s *Signature) DeSia(sia *sia.Sia) *Signature { + err := s.Signature.Unmarshal(sia.ReadByteArray8()) + + if err != nil { + s.Signature = bls12381.G1Affine{} + } + + s.Signer.DeSia(sia) + s.Processed = sia.ReadBool() + + return s +} + +func (s *Signer) Sia() *sia.Sia { + return new(sia.Sia). + AddString8(s.Name). + AddString8(s.EvmWallet). + AddByteArray8(s.PublicKey[:]). + AddByteArray8(s.ShortPublicKey[:]) +} + +func (s *Signer) DeSia(sia *sia.Sia) *Signer { + s.Name = sia.ReadString8() + s.EvmWallet = sia.ReadString8() + copy(s.PublicKey[:], sia.ReadByteArray8()) + copy(s.ShortPublicKey[:], sia.ReadByteArray8()) + + return s +} diff --git a/src/datasets/correctness.go b/src/datasets/correctness.go index d2fa1a42..59b0c41e 100644 --- a/src/datasets/correctness.go +++ b/src/datasets/correctness.go @@ -1,7 +1,7 @@ package datasets import ( - "github.com/KenshiTech/unchained/crypto/bls" + sia "github.com/pouya-eghbali/go-sia/v2/pkg" ) type Correctness struct { @@ -19,5 +19,50 @@ type CorrectnessReport struct { type BroadcastCorrectnessPacket struct { Info Correctness Signature [48]byte - Signer bls.Signer + Signer Signer +} + +func (c *Correctness) Sia() *sia.Sia { + return new(sia.Sia). + AddUInt64(c.Timestamp). + AddByteArray8(c.Hash[:]). + AddByteArray8(c.Topic[:]). + AddBool(c.Correct) +} + +func (c *Correctness) DeSia(sia *sia.Sia) *Correctness { + c.Timestamp = sia.ReadUInt64() + copy(c.Hash[:], sia.ReadByteArray8()) + copy(c.Topic[:], sia.ReadByteArray8()) + c.Correct = sia.ReadBool() + + return c +} + +func (c *CorrectnessReport) Sia() *sia.Sia { + return new(sia.Sia). + EmbedSia(c.Correctness.Sia()). + AddByteArray8(c.Signature[:]) +} + +func (c *CorrectnessReport) DeSia(sia *sia.Sia) *CorrectnessReport { + c.Correctness.DeSia(sia) + copy(c.Signature[:], sia.ReadByteArray8()) + + return c +} + +func (b *BroadcastCorrectnessPacket) Sia() *sia.Sia { + return new(sia.Sia). + EmbedSia(b.Info.Sia()). + AddByteArray8(b.Signature[:]). + EmbedSia(b.Signer.Sia()) +} + +func (b *BroadcastCorrectnessPacket) DeSia(sia *sia.Sia) *BroadcastCorrectnessPacket { + b.Info.DeSia(sia) + copy(b.Signature[:], sia.ReadByteArray8()) + b.Signer.DeSia(sia) + + return b } diff --git a/src/datasets/logs.go b/src/datasets/logs.go index 26091aed..d2aee739 100644 --- a/src/datasets/logs.go +++ b/src/datasets/logs.go @@ -3,8 +3,7 @@ package datasets import ( "encoding/json" - "github.com/KenshiTech/unchained/crypto/bls" - "github.com/vmihailenco/msgpack/v5" + sia "github.com/pouya-eghbali/go-sia/v2/pkg" ) type EventLogArg struct { @@ -13,27 +12,6 @@ type EventLogArg struct { Value any `json:"Value"` } -var _ msgpack.CustomEncoder = (*EventLogArg)(nil) - -// TODO: this can be improved -func (eventLog *EventLogArg) EncodeMsgpack(enc *msgpack.Encoder) error { - encoded, err := json.Marshal(eventLog) - if err != nil { - return err - } - return enc.EncodeBytes(encoded) -} - -var _ msgpack.CustomDecoder = (*EventLogArg)(nil) - -func (eventLog *EventLogArg) DecodeMsgpack(dec *msgpack.Decoder) error { - bytes, err := dec.DecodeBytes() - if err != nil { - return err - } - return json.Unmarshal(bytes, eventLog) -} - type EventLog struct { LogIndex uint64 Block uint64 @@ -52,5 +30,70 @@ type EventLogReport struct { type BroadcastEventPacket struct { Info EventLog Signature [48]byte - Signer bls.Signer + Signer Signer +} + +func (e *EventLog) Sia() *sia.Sia { + argsEncoded, err := json.Marshal(e.Args) + + if err != nil { + panic(err) + } + + sia := new(sia.Sia). + AddUInt64(e.LogIndex). + AddUInt64(e.Block). + AddString8(e.Address). + AddString8(e.Event). + AddString8(e.Chain). + AddByteArray8(e.TxHash[:]). + AddByteArray16(argsEncoded) + + return sia +} + +func (e *EventLog) DeSia(sia *sia.Sia) *EventLog { + e.LogIndex = sia.ReadUInt64() + e.Block = sia.ReadUInt64() + e.Address = sia.ReadString8() + e.Event = sia.ReadString8() + e.Chain = sia.ReadString8() + copy(e.TxHash[:], sia.ReadByteArray8()) + + argsEncoded := sia.ReadByteArray16() + err := json.Unmarshal(argsEncoded, &e.Args) + + if err != nil { + panic(err) + } + + return e +} + +func (e *EventLogReport) Sia() *sia.Sia { + return new(sia.Sia). + EmbedSia(e.EventLog.Sia()). + AddByteArray8(e.Signature[:]) +} + +func (e *EventLogReport) DeSia(sia *sia.Sia) *EventLogReport { + e.EventLog.DeSia(sia) + copy(e.Signature[:], sia.ReadByteArray8()) + + return e +} + +func (b *BroadcastEventPacket) Sia() *sia.Sia { + return new(sia.Sia). + EmbedSia(b.Info.Sia()). + AddByteArray8(b.Signature[:]). + EmbedSia(b.Signer.Sia()) +} + +func (b *BroadcastEventPacket) DeSia(sia *sia.Sia) *BroadcastEventPacket { + b.Info.DeSia(sia) + copy(b.Signature[:], sia.ReadByteArray8()) + b.Signer.DeSia(sia) + + return b } diff --git a/src/datasets/uniswap.go b/src/datasets/uniswap.go index 63518325..e1206f5b 100644 --- a/src/datasets/uniswap.go +++ b/src/datasets/uniswap.go @@ -3,7 +3,7 @@ package datasets import ( "math/big" - "github.com/KenshiTech/unchained/crypto/bls" + sia "github.com/pouya-eghbali/go-sia/v2/pkg" ) type TokenKey struct { @@ -33,5 +33,80 @@ type PriceReport struct { type BroadcastPricePacket struct { Info PriceInfo Signature [48]byte - Signer bls.Signer + Signer Signer +} + +func (t *TokenKey) Sia() *sia.Sia { + return new(sia.Sia). + AddString8(t.Name). + AddString8(t.Pair). + AddString8(t.Chain). + AddInt64(t.Delta). + AddBool(t.Invert). + AddString8(t.Cross) +} + +func (t *TokenKey) DeSia(sia *sia.Sia) *TokenKey { + t.Name = sia.ReadString8() + t.Pair = sia.ReadString8() + t.Chain = sia.ReadString8() + t.Delta = sia.ReadInt64() + t.Invert = sia.ReadBool() + t.Cross = sia.ReadString8() + + return t +} + +func (a *AssetKey) Sia() *sia.Sia { + return new(sia.Sia). + EmbedSia(a.Token.Sia()). + AddUInt64(a.Block) +} + +func (a *AssetKey) DeSia(sia *sia.Sia) *AssetKey { + a.Token.DeSia(sia) + a.Block = sia.ReadUInt64() + + return a +} + +func (p *PriceInfo) Sia() *sia.Sia { + return new(sia.Sia). + EmbedSia(p.Asset.Sia()). + AddBigInt(&p.Price) +} + +func (p *PriceInfo) DeSia(sia *sia.Sia) *PriceInfo { + p.Asset.DeSia(sia) + p.Price = *sia.ReadBigInt() + + return p +} + +func (p *PriceReport) Sia() *sia.Sia { + return new(sia.Sia). + EmbedSia(p.PriceInfo.Sia()). + AddByteArray8(p.Signature[:]) +} + +func (p *PriceReport) DeSia(sia *sia.Sia) *PriceReport { + p.PriceInfo.DeSia(sia) + copy(p.Signature[:], sia.ReadByteArray8()) + + return p +} + +func (b *BroadcastPricePacket) Sia() *sia.Sia { + return new(sia.Sia). + EmbedSia(b.Info.Sia()). + AddByteArray8(b.Signature[:]). + EmbedSia(b.Signer.Sia()) +} + +func (b *BroadcastPricePacket) DeSia(sia *sia.Sia) *BroadcastPricePacket { + b.Info.DeSia(sia) + copy(b.Signature[:], sia.ReadByteArray8()) + b.Signer.DeSia(sia) + + return b } diff --git a/src/go.mod b/src/go.mod index 798378ec..0b37c69d 100644 --- a/src/go.mod +++ b/src/go.mod @@ -74,6 +74,7 @@ require ( github.com/mmcloughlin/addchain v0.4.0 // indirect github.com/pelletier/go-toml/v2 v2.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pouya-eghbali/go-sia/v2 v2.0.0 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect diff --git a/src/go.sum b/src/go.sum index 8287e802..4258b521 100644 --- a/src/go.sum +++ b/src/go.sum @@ -264,6 +264,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pouya-eghbali/go-sia/v2 v2.0.0 h1:pks5GJlrEJSBc1gmfTQ7bM9jKpfSrwjR0jQYozilf64= +github.com/pouya-eghbali/go-sia/v2 v2.0.0/go.mod h1:Dfq83SuDGfQOmbwaA9/yKqFOwtzr30gySYO+f+7WjH0= github.com/prometheus/client_golang v1.12.0 h1:C+UIj/QWtmqY13Arb8kwMt5j34/0Z2iKamrJ+ryC0Gg= github.com/prometheus/client_golang v1.12.0/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= diff --git a/src/kosk/kosk.go b/src/kosk/kosk.go index 1a670451..47cdc068 100644 --- a/src/kosk/kosk.go +++ b/src/kosk/kosk.go @@ -6,6 +6,8 @@ import ( "crypto/rand" "github.com/KenshiTech/unchained/crypto/bls" + + sia "github.com/pouya-eghbali/go-sia/v2/pkg" ) const ( @@ -20,6 +22,21 @@ type Challenge struct { Signature [LenOfSignature]byte } +func (c *Challenge) Sia() *sia.Sia { + return new(sia.Sia). + AddBool(c.Passed). + AddByteArray8(c.Random[:]). + AddByteArray8(c.Signature[:]) +} + +func (c *Challenge) DeSia(sia *sia.Sia) *Challenge { + c.Passed = sia.ReadBool() + copy(c.Random[:], sia.ReadByteArray8()) + copy(c.Signature[:], sia.ReadByteArray8()) + + return c +} + func NewChallenge() [LenOfChallenge]byte { challenge := make([]byte, LenOfChallenge) _, err := rand.Read(challenge) diff --git a/src/net/client/client.go b/src/net/client/client.go index 792943dd..c12013a2 100644 --- a/src/net/client/client.go +++ b/src/net/client/client.go @@ -16,7 +16,8 @@ import ( "github.com/KenshiTech/unchained/net/shared" "github.com/gorilla/websocket" - "github.com/vmihailenco/msgpack/v5" + + sia "github.com/pouya-eghbali/go-sia/v2/pkg" ) var Done chan struct{} @@ -41,10 +42,7 @@ func StartClient() { Done = make(chan struct{}) hello := bls.ClientSigner - helloPayload, err := msgpack.Marshal(&hello) - if err != nil { - panic(err) - } + helloPayload := hello.Sia().Content go func() { defer close(Done) @@ -79,28 +77,13 @@ func StartClient() { case opcodes.KoskChallenge: // TODO: Refactor into a function - // TODO: Check for errors! - var challenge kosk.Challenge - err = msgpack.Unmarshal(payload[1:], &challenge) - if err != nil { - log.Logger. - With("Error", err). - Error("Can't unmarshal challenge") - continue - } - + challenge := new(kosk.Challenge).DeSia(&sia.Sia{Content: payload[1:]}) signature, _ := bls.Sign(*bls.ClientSecretKey, challenge.Random[:]) challenge.Signature = signature.Bytes() - koskPayload, err := msgpack.Marshal(challenge) - if err != nil { - log.Logger. - With("Error", err). - Error("Can't marshal challenge") - continue - } - + koskPayload := challenge.Sia().Content shared.Send(opcodes.KoskResult, koskPayload) + case opcodes.PriceReportBroadcast: go consumers.ConsumePriceReport(payload[1:]) diff --git a/src/net/server.go b/src/net/server.go index 3721ff8c..6dd9cd3d 100644 --- a/src/net/server.go +++ b/src/net/server.go @@ -18,54 +18,47 @@ import ( "github.com/gorilla/websocket" "github.com/puzpuzpuz/xsync/v3" - "github.com/vmihailenco/msgpack/v5" + + sia "github.com/pouya-eghbali/go-sia/v2/pkg" ) var challenges *xsync.MapOf[*websocket.Conn, kosk.Challenge] -var signers *xsync.MapOf[*websocket.Conn, bls.Signer] +var signers *xsync.MapOf[*websocket.Conn, datasets.Signer] var upgrader = websocket.Upgrader{} // use default options func processKosk(conn *websocket.Conn, payload []byte) error { - var challenge kosk.Challenge - err := msgpack.Unmarshal(payload, &challenge) - if err != nil { - log.Logger.Error("Can't unmarshal Msgpack: %v", err) - return constants.ErrInvalidPacket - } + challenge := new(kosk.Challenge).DeSia(&sia.Sia{Content: payload}) signer, ok := signers.Load(conn) if !ok { return constants.ErrMissingHello } + var err error challenge.Passed, err = kosk.VerifyChallenge(challenge.Random, signer.PublicKey, challenge.Signature) + if err != nil { return constants.ErrInvalidKosk } + if !challenge.Passed { log.Logger.Error("challenge is Passed") return constants.ErrInvalidKosk } - challenges.Store(conn, challenge) - + challenges.Store(conn, *challenge) return nil } func processHello(conn *websocket.Conn, payload []byte) ([]byte, error) { - var signer bls.Signer - err := msgpack.Unmarshal(payload, &signer) - if err != nil { - log.Logger.Error("Can't unmarshal packet: %v", err) - return []byte{}, constants.ErrInvalidPacket - } + signer := new(datasets.Signer).DeSia(&sia.Sia{Content: payload}) if signer.Name == "" { log.Logger.Error("Signer name is empty Or public key is invalid") return []byte{}, constants.ErrInvalidConfig } - signers.Range(func(conn *websocket.Conn, signerInMap bls.Signer) bool { + signers.Range(func(conn *websocket.Conn, signerInMap datasets.Signer) bool { publicKeyInUse := signerInMap.PublicKey == signer.PublicKey if publicKeyInUse { Close(conn) @@ -73,21 +66,17 @@ func processHello(conn *websocket.Conn, payload []byte) ([]byte, error) { return !publicKeyInUse }) - signers.Store(conn, signer) + signers.Store(conn, *signer) // Start KOSK verification challenge := kosk.Challenge{Random: kosk.NewChallenge()} challenges.Store(conn, challenge) - koskPayload, err := msgpack.Marshal(challenge) - if err != nil { - log.Logger.Error("Can't marshal challenge: %v", err) - return []byte{}, constants.ErrInternalError - } + koskPayload := challenge.Sia().Content return koskPayload, nil } -func checkPublicKey(conn *websocket.Conn) (*bls.Signer, error) { +func checkPublicKey(conn *websocket.Conn) (*datasets.Signer, error) { challenge, ok := challenges.Load(conn) if !ok || !challenge.Passed { return nil, constants.ErrMissingKosk @@ -108,20 +97,10 @@ func processPriceReport(conn *websocket.Conn, payload []byte) ([]byte, error) { return []byte{}, err } - var report datasets.PriceReport - err = msgpack.Unmarshal(payload, &report) - if err != nil { - log.Logger.Error("Can't unmarshal Msgpack: %v", err) - return []byte{}, constants.ErrInvalidPacket - } - - toHash, err := msgpack.Marshal(&report.PriceInfo) - if err != nil { - log.Logger.Error("Can't unmarshal Msgpack: %v", err) - return []byte{}, constants.ErrInvalidPacket - } - + report := new(datasets.PriceReport).DeSia(&sia.Sia{Content: payload}) + toHash := report.PriceInfo.Sia().Content hash, err := bls.Hash(toHash) + if err != nil { log.Logger.Error("Can't hash bls: %v", err) return []byte{}, constants.ErrInternalError @@ -154,16 +133,7 @@ func processPriceReport(conn *websocket.Conn, payload []byte) ([]byte, error) { Signer: *signer, } - priceInfoByte, err := msgpack.Marshal(&priceInfo) - // TODO: Handle this error properly - // TODO: Maybe notify the peer so they can resend - if err != nil { - log.Logger. - With("Error", err). - Error("Cannot marshal the broadcast packet") - return []byte{}, constants.ErrInternalError - } - + priceInfoByte := priceInfo.Sia().Content return priceInfoByte, nil } @@ -173,20 +143,10 @@ func processEventLog(conn *websocket.Conn, payload []byte) ([]byte, error) { return []byte{}, err } - var report datasets.EventLogReport - err = msgpack.Unmarshal(payload, &report) - if err != nil { - log.Logger.Error("Can't unmarshal Msgpack: %v", err) - return []byte{}, constants.ErrInvalidPacket - } - - toHash, err := msgpack.Marshal(&report.EventLog) - if err != nil { - log.Logger.Error("Can't unmarshal Msgpack: %v", err) - return []byte{}, constants.ErrInvalidPacket - } - + report := new(datasets.EventLogReport).DeSia(&sia.Sia{Content: payload}) + toHash := report.EventLog.Sia().Content hash, err := bls.Hash(toHash) + if err != nil { log.Logger.Error("Can't hash bls: %v", err) return []byte{}, constants.ErrInternalError @@ -219,17 +179,7 @@ func processEventLog(conn *websocket.Conn, payload []byte) ([]byte, error) { Signer: *signer, } - broadcastPayload, err := msgpack.Marshal(&broadcastPacket) - // TODO: Handle this error properly - // TODO: Maybe notify the peer so they can resend - if err != nil { - log.Logger. - With("Error", err). - Error("Cannot marshal the broadcast packet") - - return []byte{}, constants.ErrInternalError - } - + broadcastPayload := broadcastPacket.Sia().Content return broadcastPayload, nil } @@ -239,20 +189,10 @@ func processCorrectnessRecord(conn *websocket.Conn, payload []byte) ([]byte, err return []byte{}, err } - var report datasets.CorrectnessReport - err = msgpack.Unmarshal(payload, &report) - if err != nil { - log.Logger.Error("Can't unmarshal Msgpack: %v", err) - return []byte{}, constants.ErrInvalidPacket - } - - toHash, err := msgpack.Marshal(&report.Correctness) - if err != nil { - log.Logger.Error("Can't unmarshal Msgpack: %v", err) - return []byte{}, constants.ErrInvalidPacket - } - + report := new(datasets.CorrectnessReport).DeSia(&sia.Sia{Content: payload}) + toHash := report.Correctness.Sia().Content hash, err := bls.Hash(toHash) + if err != nil { log.Logger.Error("Can't hash bls: %v", err) return []byte{}, constants.ErrInternalError @@ -285,16 +225,7 @@ func processCorrectnessRecord(conn *websocket.Conn, payload []byte) ([]byte, err Signer: *signer, } - broadcastPayload, err := msgpack.Marshal(&broadcastPacket) - // TODO: Handle this error properly - // TODO: Maybe notify the peer so they can resend - if err != nil { - log.Logger. - With("Error", err). - Error("Cannot marshal the broadcast packet") - return []byte{}, constants.ErrInternalError - } - + broadcastPayload := broadcastPacket.Sia().Content return broadcastPayload, nil } @@ -399,6 +330,6 @@ func StartServer() { } func init() { - signers = xsync.NewMapOf[*websocket.Conn, bls.Signer]() + signers = xsync.NewMapOf[*websocket.Conn, datasets.Signer]() challenges = xsync.NewMapOf[*websocket.Conn, kosk.Challenge]() } diff --git a/src/plugins/correctness/correctness.go b/src/plugins/correctness/correctness.go index 4f1b7e62..6e1d004d 100644 --- a/src/plugins/correctness/correctness.go +++ b/src/plugins/correctness/correctness.go @@ -30,7 +30,7 @@ type Key struct { } var consensus *lru.Cache[Key, map[bls12381.G1Affine]big.Int] -var signatureCache *lru.Cache[bls12381.G1Affine, []bls.Signature] +var signatureCache *lru.Cache[bls12381.G1Affine, []datasets.Signature] var aggregateCache *lru.Cache[bls12381.G1Affine, bls12381.G1Affine] var DebouncedSaveSignatures func(key bls12381.G1Affine, arg SaveSignatureArgs) var signatureMutex *sync.Mutex @@ -63,7 +63,7 @@ func GetBlockNumber(network string) (*uint64, error) { func RecordSignature( signature bls12381.G1Affine, - signer bls.Signer, + signer datasets.Signer, hash bls12381.G1Affine, info datasets.Correctness, debounce bool) { @@ -121,7 +121,7 @@ func RecordSignature( cached, _ := signatureCache.Get(hash) - packed := bls.Signature{ + packed := datasets.Signature{ Signature: signature, Signer: signer, Processed: false, @@ -156,7 +156,7 @@ func SaveSignatures(args SaveSignatureArgs) { ctx := context.Background() - var newSigners []bls.Signer + var newSigners []datasets.Signer var newSignatures []bls12381.G1Affine var keys [][]byte @@ -260,7 +260,7 @@ func init() { supportedTopics = make(map[[64]byte]bool) var err error - signatureCache, err = lru.New[bls12381.G1Affine, []bls.Signature](LruSize) + signatureCache, err = lru.New[bls12381.G1Affine, []datasets.Signature](LruSize) if err != nil { panic(err) diff --git a/src/plugins/logs/logs.go b/src/plugins/logs/logs.go index c0f28317..007cd8aa 100644 --- a/src/plugins/logs/logs.go +++ b/src/plugins/logs/logs.go @@ -33,7 +33,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/go-co-op/gocron/v2" lru "github.com/hashicorp/golang-lru/v2" - "github.com/vmihailenco/msgpack/v5" "golang.org/x/text/cases" "golang.org/x/text/language" ) @@ -51,7 +50,7 @@ type SupportKey struct { } var consensus *lru.Cache[EventKey, map[bls12381.G1Affine]big.Int] -var signatureCache *lru.Cache[bls12381.G1Affine, []bls.Signature] +var signatureCache *lru.Cache[bls12381.G1Affine, []datasets.Signature] var aggregateCache *lru.Cache[bls12381.G1Affine, bls12381.G1Affine] var DebouncedSaveSignatures func(key bls12381.G1Affine, arg SaveSignatureArgs) var signatureMutex *sync.Mutex @@ -102,7 +101,7 @@ type SaveSignatureArgs struct { func RecordSignature( signature bls12381.G1Affine, - signer bls.Signer, + signer datasets.Signer, hash bls12381.G1Affine, info datasets.EventLog, debounce bool, @@ -176,7 +175,7 @@ func RecordSignature( cached, _ := signatureCache.Get(hash) - packed := bls.Signature{ + packed := datasets.Signature{ Signature: signature, Signer: signer, Processed: false, @@ -211,7 +210,7 @@ func SaveSignatures(args SaveSignatureArgs) { ctx := context.Background() - var newSigners []bls.Signer + var newSigners []datasets.Signer var newSignatures []bls12381.G1Affine var keys [][]byte @@ -440,25 +439,18 @@ func createTask(configs []LogConf, chain string) func() { Args: args, } - toHash, err := msgpack.Marshal(&event) - if err != nil { - panic(err) - } - + toHash := event.Sia().Content signature, hash := bls.Sign(*bls.ClientSecretKey, toHash) - compressedSignature := signature.Bytes() - priceReport := datasets.EventLogReport{ - EventLog: event, - Signature: compressedSignature, - } + if conf.Send { + compressedSignature := signature.Bytes() - payload, err := msgpack.Marshal(&priceReport) - if err != nil { - panic(err) - } + priceReport := datasets.EventLogReport{ + EventLog: event, + Signature: compressedSignature, + } - if conf.Send { + payload := priceReport.Sia().Content shared.Send(opcodes.EventLog, payload) } @@ -572,7 +564,7 @@ func init() { supportedEvents = make(map[SupportKey]bool) var err error - signatureCache, err = lru.New[bls12381.G1Affine, []bls.Signature](LruSize) + signatureCache, err = lru.New[bls12381.G1Affine, []datasets.Signature](LruSize) if err != nil { panic(err) diff --git a/src/plugins/uniswap/uniswap.go b/src/plugins/uniswap/uniswap.go index b1d8ea69..4754bd77 100644 --- a/src/plugins/uniswap/uniswap.go +++ b/src/plugins/uniswap/uniswap.go @@ -28,12 +28,12 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/go-co-op/gocron/v2" "github.com/puzpuzpuz/xsync/v3" - "github.com/vmihailenco/msgpack/v5" "golang.org/x/text/cases" "golang.org/x/text/language" bls12381 "github.com/consensys/gnark-crypto/ecc/bls12-381" lru "github.com/hashicorp/golang-lru/v2" + sia "github.com/pouya-eghbali/go-sia/v2/pkg" ) var DebouncedSaveSignatures func(key datasets.AssetKey, arg SaveSignatureArgs) @@ -61,7 +61,7 @@ type Token struct { var priceCache map[string]*lru.Cache[uint64, big.Int] var consensus *lru.Cache[datasets.AssetKey, xsync.MapOf[bls12381.G1Affine, big.Int]] -var signatureCache *lru.Cache[bls12381.G1Affine, []bls.Signature] +var signatureCache *lru.Cache[bls12381.G1Affine, []datasets.Signature] var aggregateCache *lru.Cache[bls12381.G1Affine, bls12381.G1Affine] var supportedTokens map[datasets.TokenKey]bool @@ -75,7 +75,7 @@ var lastPrice big.Int func CheckAndCacheSignature( reportedValues *xsync.MapOf[bls12381.G1Affine, big.Int], - signature bls12381.G1Affine, signer bls.Signer, + signature bls12381.G1Affine, signer datasets.Signer, hash bls12381.G1Affine, totalVoted *big.Int) error { signatureMutex.Lock() @@ -83,7 +83,7 @@ func CheckAndCacheSignature( cached, _ := signatureCache.Get(hash) - packed := bls.Signature{ + packed := datasets.Signature{ Signature: signature, Signer: signer, Processed: false, @@ -109,7 +109,7 @@ func CheckAndCacheSignature( // TODO: Can we turn this into a library func? func RecordSignature( signature bls12381.G1Affine, - signer bls.Signer, + signer datasets.Signer, hash bls12381.G1Affine, info datasets.PriceInfo, debounce bool, @@ -259,7 +259,7 @@ func SaveSignatures(args SaveSignatureArgs) { ctx := context.Background() - var newSigners []bls.Signer + var newSigners []datasets.Signer var newSignatures []bls12381.G1Affine var keys [][]byte @@ -473,13 +473,7 @@ func Setup() { os.Exit(1) } - key, err := tokenKey(token) - - if err != nil { - log.Logger.Error("Failed to compute token key.") - os.Exit(1) - } - + key := tokenKey(token) supportedTokens[*key] = true } } @@ -539,10 +533,7 @@ func syncBlock(token Token, caser cases.Caser, key *datasets.TokenKey, blockInx With("Price", priceStr). Info(caser.String(token.Name)) - key, err = tokenKey(token) - if err != nil { - return - } + key = tokenKey(token) priceInfo := datasets.PriceInfo{ Price: *price, @@ -552,27 +543,18 @@ func syncBlock(token Token, caser cases.Caser, key *datasets.TokenKey, blockInx }, } - toHash, err := msgpack.Marshal(&priceInfo) - if err != nil { - log.Logger.Error("Couldn't marshal price info.") - os.Exit(1) - } - + toHash := priceInfo.Sia().Content signature, hash := bls.Sign(*bls.ClientSecretKey, toHash) - compressedSignature := signature.Bytes() - priceReport := datasets.PriceReport{ - PriceInfo: priceInfo, - Signature: compressedSignature, - } + if token.Send && !shared.IsClientSocketClosed { + compressedSignature := signature.Bytes() - payload, err := msgpack.Marshal(&priceReport) - if err != nil { - log.Logger.Error("Couldn't marshal price report.") - panic(err) - } + priceReport := datasets.PriceReport{ + PriceInfo: priceInfo, + Signature: compressedSignature, + } - if token.Send && !shared.IsClientSocketClosed { + payload := priceReport.Sia().Content shared.Send(opcodes.PriceReport, payload) } @@ -603,19 +585,17 @@ func syncBlocks(token Token, key datasets.TokenKey, latest uint64) { } } -func tokenKey(token Token) (*datasets.TokenKey, error) { +func tokenKey(token Token) *datasets.TokenKey { var cross []datasets.TokenKey for _, id := range token.Cross { cross = append(cross, crossTokens[id]) } - toHash, err := msgpack.Marshal(cross) - - if err != nil { - log.Logger.Error("Couldn't hash token key") - return nil, err - } + toHash := new(sia.ArraySia[datasets.TokenKey]). + AddArray8(cross, func(s *sia.ArraySia[datasets.TokenKey], item datasets.TokenKey) { + s.EmbedSia(item.Sia()) + }).Content hash := shake.Shake(toHash) @@ -628,7 +608,7 @@ func tokenKey(token Token) (*datasets.TokenKey, error) { Cross: string(hash), } - return &key, nil + return &key } func createTask(tokens []Token, chain string) func() { @@ -648,12 +628,7 @@ func createTask(tokens []Token, chain string) func() { } // TODO: this can be cached - key, err := tokenKey(token) - - if err != nil { - continue - } - + key := tokenKey(token) tokenLastBlock, exists := lastBlock.Load(*key) if !exists { @@ -725,7 +700,7 @@ func init() { supportedTokens = make(map[datasets.TokenKey]bool) var err error - signatureCache, err = lru.New[bls12381.G1Affine, []bls.Signature](LruSize) + signatureCache, err = lru.New[bls12381.G1Affine, []datasets.Signature](LruSize) if err != nil { log.Logger.Error("Failed to create token price signature cache.")