Skip to content

Commit

Permalink
Delete stale jobs from consensus queues (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vizualni authored Jul 28, 2022
1 parent 08a1b30 commit 45ae933
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 64 deletions.
19 changes: 14 additions & 5 deletions proto/consensus/consensus_queue.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
syntax = "proto3";
import "google/protobuf/any.proto";
import "gogoproto/gogo.proto";
import "google/protobuf/timestamp.proto";

package volumefi.paloma.consensus;

Expand All @@ -9,12 +10,20 @@ option go_package = "github.com/palomachain/paloma/x/consensus/types";
// message for storing the queued signed message in the internal queue
message QueuedSignedMessage {
uint64 id = 1;
google.protobuf.Any msg = 2;
bytes bytesToSign = 3;
repeated SignData signData = 4;
repeated Evidence evidence = 5;

PublicAccessData publicAccessData = 6;
int64 addedAtBlockHeight = 2;
google.protobuf.Timestamp addedAt = 3 [
(gogoproto.nullable) = false,
(gogoproto.stdtime) = true
];

google.protobuf.Any msg = 4;
bytes bytesToSign = 5;

repeated SignData signData = 6;
repeated Evidence evidence = 7;

PublicAccessData publicAccessData = 8;
}

message BatchOfConsensusMessages {
Expand Down
4 changes: 3 additions & 1 deletion x/consensus/keeper/attest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package keeper

import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/palomachain/paloma/x/consensus/keeper/consensus"
)

// CheckAndProcessAttestedMessages is supposed to be used within the
Expand All @@ -18,7 +19,8 @@ func (k Keeper) CheckAndProcessAttestedMessages(ctx sdk.Context) error {
if err != nil {
return err
}
for _, opt := range opts {
for _, queueName := range consensus.SortedQueueNames(ctx, opts) {
opt := opts[queueName]
msgs, err := k.GetMessagesFromQueue(ctx, opt.QueueTypeName, 9999)
if err != nil {
continue
Expand Down
36 changes: 35 additions & 1 deletion x/consensus/keeper/concensus_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package keeper

import (
"fmt"
"time"

codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand All @@ -12,7 +13,8 @@ import (
)

const (
encodingDelimiter = byte('|')
encodingDelimiter = byte('|')
deleteJobAfterDuration = 30 * time.Minute
)

// getConsensusQueue gets the consensus queue for the given type.
Expand Down Expand Up @@ -288,3 +290,35 @@ func (k Keeper) queuedMessageToMessageToSign(msg types.QueuedSignedMessageI) *ty
Msg: anyMsg,
}
}

func (k Keeper) RemoveUnexecutedJobs(ctx sdk.Context) error {
now := ctx.BlockTime()

for _, supported := range k.registry.slice {
queuesMap, err := supported.SupportedQueues(ctx)
if err != nil {
return err
}
for _, queueName := range consensus.SortedQueueNames(ctx, queuesMap) {
cq, err := k.getConsensusQueue(ctx, queueName)
if err != nil {
return err
}

jobs, err := cq.GetAll(ctx)
if err != nil {
return err
}

for _, job := range jobs {
if now.Sub(job.GetAddedAt()) >= deleteJobAfterDuration {
if err := cq.Remove(ctx, job.GetId()); err != nil {
return err
}
}
}
}
}

return nil
}
8 changes: 5 additions & 3 deletions x/consensus/keeper/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,11 @@ func (c Queue) Put(ctx sdk.Context, msgs ...ConsensusMsg) error {
return err
}
queuedMsg := &types.QueuedSignedMessage{
Id: newID,
Msg: anyMsg,
SignData: []*types.SignData{},
Id: newID,
Msg: anyMsg,
SignData: []*types.SignData{},
AddedAtBlockHeight: ctx.BlockHeight(),
AddedAt: ctx.BlockTime(),
BytesToSign: c.qo.BytesToSignCalculator(msg, types.Salt{
Nonce: nonce,
}),
Expand Down
9 changes: 9 additions & 0 deletions x/consensus/keeper/consensus/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package consensus

import (
"sort"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/palomachain/paloma/util/slice"
"github.com/palomachain/paloma/x/consensus/types"
)

Expand Down Expand Up @@ -40,3 +43,9 @@ type SupportsConsensusQueueAction struct {
type SupportsConsensusQueue interface {
SupportedQueues(ctx sdk.Context) (map[string]SupportsConsensusQueueAction, error)
}

func SortedQueueNames(ctx sdk.Context, queuesMap map[string]SupportsConsensusQueueAction) []string {
queueNames := slice.FromMapKeys(queuesMap)
sort.Strings(queueNames)
return queueNames
}
8 changes: 5 additions & 3 deletions x/consensus/keeper/grpc_query_get_all_queue_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/palomachain/paloma/x/consensus/keeper/consensus"
"github.com/palomachain/paloma/x/consensus/types"
"github.com/vizualni/whoops"
"google.golang.org/grpc/codes"
Expand All @@ -16,10 +17,11 @@ func (k Keeper) GetAllQueueNames(goCtx context.Context, req *types.QueryGetAllQu
}
names := []string{}

ctx := sdk.UnwrapSDKContext(goCtx)

for _, supported := range k.registry.slice {
for queue := range whoops.Must(supported.SupportedQueues(sdk.UnwrapSDKContext(goCtx))) {
names = append(names, queue)
}
queuesMap := whoops.Must(supported.SupportedQueues(ctx))
names = append(names, consensus.SortedQueueNames(ctx, queuesMap)...)
}

return &types.QueryGetAllQueueNamesResponse{
Expand Down
2 changes: 0 additions & 2 deletions x/consensus/keeper/grpc_query_messages_in_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package keeper

import (
"context"
"fmt"

codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -73,6 +72,5 @@ func (k Keeper) MessagesInQueue(goCtx context.Context, req *types.QueryMessagesI
}
res.Messages = append(res.Messages, approvedMessage)
}
fmt.Println("DUZINA", len(res.Messages))
return res, nil
}
3 changes: 3 additions & 0 deletions x/consensus/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,8 @@ func (am AppModule) BeginBlock(ctx sdk.Context, _ abci.RequestBeginBlock) {
// EndBlock executes all ABCI EndBlock logic respective to the capability module. It
// returns no validator updates.
func (am AppModule) EndBlock(ctx sdk.Context, _ abci.RequestEndBlock) []abci.ValidatorUpdate {
if ctx.BlockHeight()%10 == 9 {
am.keeper.RemoveUnexecutedJobs(ctx)
}
return EndBlocker(ctx, am.keeper)
}
3 changes: 3 additions & 0 deletions x/consensus/types/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
types "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
proto "github.com/gogo/protobuf/proto"
"time"
)

type ConsensusQueueType string
Expand All @@ -14,6 +15,8 @@ type QueuedSignedMessageI interface {
proto.Message
GetId() uint64
Nonce() []byte
GetAddedAtBlockHeight() int64
GetAddedAt() time.Time
ConsensusMsg(AnyUnpacker) (ConsensusMsg, error)
GetSignData() []*SignData
AddSignData(*SignData)
Expand Down
Loading

0 comments on commit 45ae933

Please sign in to comment.