Skip to content

Commit

Permalink
Add appendRawHistoryNodes api (#2797)
Browse files Browse the repository at this point in the history
* Add AppendRawHistoryNodes api

* Update test init logic
  • Loading branch information
yux0 authored May 9, 2022
1 parent 3dc5143 commit 0c75418
Show file tree
Hide file tree
Showing 11 changed files with 471 additions and 138 deletions.
370 changes: 246 additions & 124 deletions api/adminservice/v1/request_response.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,8 @@ const (

// PersistenceAppendHistoryNodesScope tracks AppendHistoryNodes calls made by service to persistence layer
PersistenceAppendHistoryNodesScope
// PersistenceAppendRawHistoryNodesScope tracks AppendRawHistoryNodes calls made by service to persistence layer
PersistenceAppendRawHistoryNodesScope
// PersistenceDeleteHistoryNodesScope tracks DeleteHistoryNodes calls made by service to persistence layer
PersistenceDeleteHistoryNodesScope
// PersistenceReadHistoryBranchScope tracks ReadHistoryBranch calls made by service to persistence layer
Expand Down Expand Up @@ -1289,6 +1291,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
VisibilityPersistenceCountWorkflowExecutionsScope: {operation: "CountWorkflowExecutions", tags: map[string]string{visibilityTypeTagName: unknownValue}},

PersistenceAppendHistoryNodesScope: {operation: "AppendHistoryNodes"},
PersistenceAppendRawHistoryNodesScope: {operation: "AppendRawHistoryNodes"},
PersistenceDeleteHistoryNodesScope: {operation: "DeleteHistoryNodes"},
PersistenceReadHistoryBranchScope: {operation: "ReadHistoryBranch"},
PersistenceReadHistoryBranchReverseScope: {operation: "ReadHistoryBranchReverse"},
Expand Down
24 changes: 24 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,26 @@ type (
Size int
}

// AppendRawHistoryNodesRequest is used to append a batch of raw history nodes
AppendRawHistoryNodesRequest struct {
// The shard to get history node data
ShardID int32
// true if this is the first append request to the branch
IsNewBranch bool
// the info for clean up data in background
Info string
// The branch to be appended
BranchToken []byte
// The batch of events to be appended. The first eventID will become the nodeID of this batch
History *commonpb.DataBlob
// TransactionID for events before these events. For events chaining
PrevTransactionID int64
// requested TransactionID for this write operation. For the same eventID, the node with larger TransactionID always wins
TransactionID int64
// NodeID is the first event id.
NodeID int64
}

// ReadHistoryBranchRequest is used to read a history branch
ReadHistoryBranchRequest struct {
// The shard to get history branch data
Expand Down Expand Up @@ -795,6 +815,8 @@ type (
ReadRawHistoryBranchResponse struct {
// HistoryEventBlobs history event blobs
HistoryEventBlobs []*commonpb.DataBlob
// NodeIDs is the first event id of each history blob
NodeIDs []int64
// Token to read next page if there are more events beyond page size.
// Use this to set NextPageToken on ReadHistoryBranchRequest to read the next page.
// Empty means we have reached the last page, not need to continue
Expand Down Expand Up @@ -1026,6 +1048,8 @@ type (

// AppendHistoryNodes add a node to history node table
AppendHistoryNodes(ctx context.Context, request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
// AppendRawHistoryNodes add a node of raw histories to history ndoe table
AppendRawHistoryNodes(ctx context.Context, request *AppendRawHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
// ReadHistoryBranch returns history node data for a branch
ReadHistoryBranch(ctx context.Context, request *ReadHistoryBranchRequest) (*ReadHistoryBranchResponse, error)
// ReadHistoryBranchByBatch returns history node data for a branch ByBatch
Expand Down
15 changes: 15 additions & 0 deletions common/persistence/dataInterfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

103 changes: 94 additions & 9 deletions common/persistence/history_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,71 @@ func (m *executionManagerImpl) serializeAppendHistoryNodesRequest(
return req, nil
}

func (m *executionManagerImpl) serializeAppendRawHistoryNodesRequest(
request *AppendRawHistoryNodesRequest,
) (*InternalAppendHistoryNodesRequest, error) {
branch, err := m.serializer.HistoryBranchFromBlob(&commonpb.DataBlob{Data: request.BranchToken, EncodingType: enumspb.ENCODING_TYPE_PROTO3})
if err != nil {
return nil, err
}

if len(request.History.Data) == 0 {
return nil, &InvalidPersistenceRequestError{
Msg: fmt.Sprintf("events to be appended cannot be empty"),
}
}
sortAncestors(branch.Ancestors)

nodeID := request.NodeID
if nodeID <= 0 {
return nil, &InvalidPersistenceRequestError{
Msg: fmt.Sprintf("eventID cannot be less than 1"),
}
}
// nodeID will be the first eventID
size := len(request.History.Data)
sizeLimit := m.transactionSizeLimit()
if size > sizeLimit {
return nil, &TransactionSizeLimitError{
Msg: fmt.Sprintf("transaction size of %v bytes exceeds limit of %v bytes", size, sizeLimit),
}
}

req := &InternalAppendHistoryNodesRequest{
IsNewBranch: request.IsNewBranch,
Info: request.Info,
BranchInfo: branch,
Node: InternalHistoryNode{
NodeID: nodeID,
Events: request.History,
PrevTransactionID: request.PrevTransactionID,
TransactionID: request.TransactionID,
},
ShardID: request.ShardID,
}

if req.IsNewBranch {
// TreeInfo is only needed for new branch
treeInfoBlob, err := m.serializer.HistoryTreeInfoToBlob(&persistencespb.HistoryTreeInfo{
BranchInfo: branch,
ForkTime: timestamp.TimeNowPtrUtc(),
Info: request.Info,
}, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return nil, err
}
req.TreeInfo = treeInfoBlob
}

if nodeID < GetBeginNodeID(branch) {
return nil, &InvalidPersistenceRequestError{
Msg: fmt.Sprintf("cannot append to ancestors' nodes"),
}
}

return req, nil
}

// AppendHistoryNodes add a node to history node table
func (m *executionManagerImpl) AppendHistoryNodes(
ctx context.Context,
Expand All @@ -445,6 +510,23 @@ func (m *executionManagerImpl) AppendHistoryNodes(
}, err
}

// AppendRawHistoryNodes add raw history nodes to history node table
func (m *executionManagerImpl) AppendRawHistoryNodes(
ctx context.Context,
request *AppendRawHistoryNodesRequest,
) (*AppendHistoryNodesResponse, error) {

req, err := m.serializeAppendRawHistoryNodesRequest(request)
if err != nil {
return nil, err
}

err = m.persistence.AppendHistoryNodes(ctx, req)
return &AppendHistoryNodesResponse{
Size: len(request.History.Data),
}, err
}

// ReadHistoryBranchByBatch returns history node data for a branch by batch
// Pagination is implemented here, the actual minNodeID passing to persistence layer is calculated along with token's LastNodeID
func (m *executionManagerImpl) ReadHistoryBranchByBatch(
Expand Down Expand Up @@ -479,7 +561,7 @@ func (m *executionManagerImpl) ReadRawHistoryBranch(
request *ReadHistoryBranchRequest,
) (*ReadRawHistoryBranchResponse, error) {

dataBlobs, _, token, dataSize, err := m.readRawHistoryBranchAndFilter(ctx, request)
dataBlobs, _, nodeIDs, token, dataSize, err := m.readRawHistoryBranchAndFilter(ctx, request)
if err != nil {
return nil, err
}
Expand All @@ -491,6 +573,7 @@ func (m *executionManagerImpl) ReadRawHistoryBranch(

return &ReadRawHistoryBranchResponse{
HistoryEventBlobs: dataBlobs,
NodeIDs: nodeIDs,
NextPageToken: nextPageToken,
Size: dataSize,
}, nil
Expand Down Expand Up @@ -660,7 +743,7 @@ func (m *executionManagerImpl) readRawHistoryBranchReverse(
func (m *executionManagerImpl) readRawHistoryBranchAndFilter(
ctx context.Context,
request *ReadHistoryBranchRequest,
) ([]*commonpb.DataBlob, []int64, *historyPagingToken, int, error) {
) ([]*commonpb.DataBlob, []int64, []int64, *historyPagingToken, int, error) {

shardID := request.ShardID
branchToken := request.BranchToken
Expand All @@ -669,7 +752,7 @@ func (m *executionManagerImpl) readRawHistoryBranchAndFilter(

branch, err := serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String())
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, nil, nil, 0, err
}
treeID := branch.TreeId
branchID := branch.BranchId
Expand All @@ -692,7 +775,7 @@ func (m *executionManagerImpl) readRawHistoryBranchAndFilter(
defaultLastTransactionID,
)
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, nil, nil, 0, err
}

nodes, token, err := m.readRawHistoryBranch(
Expand All @@ -707,10 +790,10 @@ func (m *executionManagerImpl) readRawHistoryBranchAndFilter(
false,
)
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, nil, nil, 0, err
}
if len(nodes) == 0 && len(request.NextPageToken) == 0 {
return nil, nil, nil, 0, serviceerror.NewNotFound("Workflow execution history not found.")
return nil, nil, nil, nil, 0, serviceerror.NewNotFound("Workflow execution history not found.")
}

nodes, err = m.filterHistoryNodes(
Expand All @@ -719,25 +802,27 @@ func (m *executionManagerImpl) readRawHistoryBranchAndFilter(
nodes,
)
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, nil, nil, 0, err
}

var dataBlobs []*commonpb.DataBlob
transactionIDs := make([]int64, 0, len(nodes))
nodeIDs := make([]int64, 0, len(nodes))
dataSize := 0
if len(nodes) > 0 {
dataBlobs = make([]*commonpb.DataBlob, len(nodes))
for index, node := range nodes {
dataBlobs[index] = node.Events
dataSize += len(node.Events.Data)
transactionIDs = append(transactionIDs, node.TransactionID)
nodeIDs = append(nodeIDs, node.NodeID)
}
lastNode := nodes[len(nodes)-1]
token.LastNodeID = lastNode.NodeID
token.LastTransactionID = lastNode.TransactionID
}

return dataBlobs, transactionIDs, token, dataSize, nil
return dataBlobs, transactionIDs, nodeIDs, token, dataSize, nil
}

func (m *executionManagerImpl) readRawHistoryBranchReverseAndFilter(
Expand Down Expand Up @@ -834,7 +919,7 @@ func (m *executionManagerImpl) readHistoryBranch(
request *ReadHistoryBranchRequest,
) ([]*historypb.HistoryEvent, []*historypb.History, []int64, []byte, int, error) {

dataBlobs, transactionIDs, token, dataSize, err := m.readRawHistoryBranchAndFilter(ctx, request)
dataBlobs, transactionIDs, _, token, dataSize, err := m.readRawHistoryBranchAndFilter(ctx, request)
if err != nil {
return nil, nil, nil, nil, 0, err
}
Expand Down
1 change: 0 additions & 1 deletion common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (

const (
EmptyQueueMessageID = int64(-1)
MinQueueMessageID = EmptyQueueMessageID + 1
MaxQueueMessageID = math.MaxInt64
)

Expand Down
15 changes: 15 additions & 0 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,21 @@ func (p *executionPersistenceClient) AppendHistoryNodes(
return resp, err
}

// AppendRawHistoryNodes add a node to history node table
func (p *executionPersistenceClient) AppendRawHistoryNodes(
ctx context.Context,
request *AppendRawHistoryNodesRequest,
) (*AppendHistoryNodesResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceAppendRawHistoryNodesScope, metrics.PersistenceRequests)
sw := p.metricClient.StartTimer(metrics.PersistenceAppendRawHistoryNodesScope, metrics.PersistenceLatency)
resp, err := p.persistence.AppendRawHistoryNodes(ctx, request)
sw.Stop()
if err != nil {
p.updateErrorMetric(metrics.PersistenceAppendRawHistoryNodesScope, err)
}
return resp, err
}

// ReadHistoryBranch returns history node data for a branch
func (p *executionPersistenceClient) ReadHistoryBranch(
ctx context.Context,
Expand Down
11 changes: 11 additions & 0 deletions common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,17 @@ func (p *executionRateLimitedPersistenceClient) AppendHistoryNodes(
return p.persistence.AppendHistoryNodes(ctx, request)
}

// AppendRawHistoryNodes add a node to history node table
func (p *executionRateLimitedPersistenceClient) AppendRawHistoryNodes(
ctx context.Context,
request *AppendRawHistoryNodesRequest,
) (*AppendHistoryNodesResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
}
return p.persistence.AppendRawHistoryNodes(ctx, request)
}

// ReadHistoryBranch returns history node data for a branch
func (p *executionRateLimitedPersistenceClient) ReadHistoryBranch(
ctx context.Context,
Expand Down
Loading

0 comments on commit 0c75418

Please sign in to comment.