-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[4/N][multi quorum ejection] Create operator quorum intervals #304
Changes from 4 commits
b2b00a3
b41e1d8
2dbf49a
966596a
d558aba
6b29570
b87a07a
bca97d4
39e365c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
package dataapi | ||
|
||
import "fmt" | ||
|
||
type OperatorQuorum struct { | ||
Operator string | ||
QuorumNumbers []byte | ||
BlockNumber uint32 | ||
} | ||
|
||
// Representing an interval [StartBlock, EndBlock] (inclusive). | ||
type BlockInterval struct { | ||
StartBlock uint32 | ||
EndBlock uint32 | ||
} | ||
|
||
// OperatorQuorumIntervals[op][q] is a sequence of increasing and non-overlapping | ||
// intervals during which the operator "op" is registered in quorum "q". | ||
type OperatorQuorumIntervals map[string]map[uint8][]BlockInterval | ||
|
||
// GetQuorums returns the quorums the operator is registered in at the given block number. | ||
func (oqi OperatorQuorumIntervals) GetQuorums(operatorId string, blockNum uint32) []uint8 { | ||
quorums := make([]uint8, 0) | ||
for q, intervals := range oqi[operatorId] { | ||
// Note: if len(intervals) is large, we can perform binary search here. | ||
// In practice it should be quite small given that the quorum change is | ||
// not frequent, so search it with brute force here. | ||
live := false | ||
for _, interval := range intervals { | ||
if interval.StartBlock > blockNum { | ||
break | ||
} | ||
if blockNum <= interval.EndBlock { | ||
live = true | ||
break | ||
} | ||
} | ||
if live { | ||
quorums = append(quorums, q) | ||
} | ||
} | ||
return quorums | ||
} | ||
|
||
// CreateOperatorQuorumIntervals creates OperatorQuorumIntervals that are within the | ||
// the block interval [startBlock, endBlock] for operators. | ||
// | ||
// The parameters: | ||
// - startBlock, endBlock: specifying the block interval that's of interest. | ||
// Requires: startBlock <= endBlock. | ||
// - operatorInitialQuorum: the initial quorums at startBlock that operators were | ||
// registered in. | ||
// Requires: operatorInitialQuorum[op] is non-empty for each operator "op". | ||
// - addedToQuorum, removedFromQuorum: a sequence of events that added/removed operators | ||
// to/from quorums. | ||
// Requires: | ||
// 1) the block numbers for all events are in range [startBlock+1, endBlock]; | ||
// 2) the events are in ascending order by block number for each operator "op". | ||
func CreateOperatorQuorumIntervals( | ||
startBlock uint32, | ||
endBlock uint32, | ||
operatorInitialQuorum map[string][]uint8, | ||
addedToQuorum map[string][]*OperatorQuorum, | ||
removedFromQuorum map[string][]*OperatorQuorum, | ||
) (OperatorQuorumIntervals, error) { | ||
if startBlock > endBlock { | ||
msg := "the startBlock must be no less than endBlock, but found " + | ||
"startBlock: %d, endBlock: %d" | ||
return nil, fmt.Errorf(msg, startBlock, endBlock) | ||
} | ||
operatorQuorumIntervals := make(OperatorQuorumIntervals) | ||
addedToQuorumErr := "cannot add operator %s to quorum %d at block number %d, " + | ||
"the operator is already in the quorum since block number: %d" | ||
for op, initialQuorums := range operatorInitialQuorum { | ||
if len(initialQuorums) == 0 { | ||
return nil, fmt.Errorf("operator %s must be in at least one quorum at block %d", op, startBlock) | ||
} | ||
operatorQuorumIntervals[op] = make(map[uint8][]BlockInterval) | ||
openQuorum := make(map[uint8]uint32) | ||
for _, q := range initialQuorums { | ||
openQuorum[q] = startBlock | ||
} | ||
added := addedToQuorum[op] | ||
removed := removedFromQuorum[op] | ||
eventErr := validateQuorumEvents(added, removed, startBlock, endBlock) | ||
if eventErr != nil { | ||
return nil, eventErr | ||
} | ||
i, j := 0, 0 | ||
for i < len(added) && j < len(removed) { | ||
// TODO(jianoaix): Having quorum addition and removal in the same block is a valid case. | ||
// Come up a followup fix to handle this special case. | ||
if added[i].BlockNumber == removed[j].BlockNumber { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah we need to consider the transaction indexes of the transactions or the log index if they happened in the same transaction There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea in my mind was to make a RPC call to the eth node to get the quorums after applying all txns in a block, so we could avoid having to know the details inside a block. Quorum opt-in/out are not frequent events, even more so the opt-in/out happening in a same block, so should be small cost to handle. |
||
msg := "Not yet supported: operator was adding and removing quorums at the " + | ||
"same block. operator: %s, block number: %d" | ||
return nil, fmt.Errorf(msg, op, added[i].BlockNumber) | ||
} | ||
if added[i].BlockNumber < removed[j].BlockNumber { | ||
for _, q := range added[i].QuorumNumbers { | ||
start, ok := openQuorum[q] | ||
if ok { | ||
return nil, fmt.Errorf(addedToQuorumErr, op, q, added[i].BlockNumber, start) | ||
} | ||
openQuorum[q] = added[i].BlockNumber | ||
} | ||
i++ | ||
} else { | ||
err := removeQuorums(removed[j], openQuorum, operatorQuorumIntervals) | ||
if err != nil { | ||
return nil, err | ||
} | ||
j++ | ||
} | ||
} | ||
for ; i < len(added); i++ { | ||
for _, q := range added[i].QuorumNumbers { | ||
start, ok := openQuorum[q] | ||
if ok { | ||
return nil, fmt.Errorf(addedToQuorumErr, op, q, added[i].BlockNumber, start) | ||
} | ||
openQuorum[q] = added[i].BlockNumber | ||
} | ||
} | ||
for ; j < len(removed); j++ { | ||
err := removeQuorums(removed[j], openQuorum, operatorQuorumIntervals) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
for q, start := range openQuorum { | ||
interval := BlockInterval{ | ||
StartBlock: start, | ||
EndBlock: endBlock, | ||
} | ||
_, ok := operatorQuorumIntervals[op][q] | ||
if !ok { | ||
operatorQuorumIntervals[op][q] = make([]BlockInterval, 0) | ||
} | ||
operatorQuorumIntervals[op][q] = append(operatorQuorumIntervals[op][q], interval) | ||
} | ||
} | ||
|
||
return operatorQuorumIntervals, nil | ||
} | ||
|
||
// removeQuorums handles a quorum removal event, which marks the end of membership in a | ||
// quorum, so it'll form a block interval. | ||
func removeQuorums(operatorQuorum *OperatorQuorum, openQuorum map[uint8]uint32, result OperatorQuorumIntervals) error { | ||
op := operatorQuorum.Operator | ||
for _, q := range operatorQuorum.QuorumNumbers { | ||
start, ok := openQuorum[q] | ||
if !ok { | ||
msg := "cannot remove a quorum %d, the operator %s is not yet in the quorum " + | ||
"at block %d" | ||
return fmt.Errorf(msg, q, op, operatorQuorum.BlockNumber) | ||
} | ||
if start >= operatorQuorum.BlockNumber { | ||
msg := "deregistration block number %d must be strictly greater than its " + | ||
"registration block number %d, for operator %s, quorum %d" | ||
return fmt.Errorf(msg, operatorQuorum.BlockNumber, start, op, q) | ||
} | ||
interval := BlockInterval{ | ||
StartBlock: start, | ||
// The operator is NOT live at the block it's deregistered. | ||
EndBlock: operatorQuorum.BlockNumber - 1, | ||
} | ||
_, ok = result[op][q] | ||
if !ok { | ||
result[op][q] = make([]BlockInterval, 0) | ||
} | ||
result[op][q] = append(result[op][q], interval) | ||
delete(openQuorum, q) | ||
} | ||
return nil | ||
} | ||
|
||
// validateQuorumEvents validates the operator quorum events have the desired block numbers and | ||
// are in ascending order by block number. | ||
func validateQuorumEvents(added []*OperatorQuorum, removed []*OperatorQuorum, startBlock, endBlock uint32) error { | ||
validate := func(events []*OperatorQuorum) error { | ||
for i := range events { | ||
if events[i].BlockNumber <= startBlock || events[i].BlockNumber > endBlock { | ||
return fmt.Errorf("quorum events must be in range [%d, %d]", startBlock+1, endBlock) | ||
} | ||
if i > 0 && events[i].BlockNumber < events[i-1].BlockNumber { | ||
return fmt.Errorf("quorum events must be in ascending order by block number") | ||
} | ||
} | ||
return nil | ||
} | ||
err := validate(added) | ||
if err != nil { | ||
return err | ||
} | ||
return validate(removed) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be removed once #293 is in.