Skip to content

Commit

Permalink
Remove deprecated shard info ack level fields (#2884)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored May 23, 2022
1 parent df43fba commit 6b2b3f2
Show file tree
Hide file tree
Showing 36 changed files with 761 additions and 1,663 deletions.
1,536 changes: 384 additions & 1,152 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

16 changes: 3 additions & 13 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,11 +497,6 @@ func ShardTime(shardTime interface{}) ZapTag {
return NewAnyTag("shard-time", shardTime)
}

// ShardReplicationAck returns tag for ShardReplicationAck
func ShardReplicationAck(shardReplicationAck int64) ZapTag {
return NewInt64("shard-replication-ack", shardReplicationAck)
}

// PreviousShardRangeID returns tag for PreviousShardRangeID
func PreviousShardRangeID(id int64) ZapTag {
return NewInt64("previous-shard-range-id", id)
Expand Down Expand Up @@ -537,14 +532,9 @@ func MaxLevel(lv int64) ZapTag {
return NewInt64("max-level", lv)
}

// ShardTransferAcks returns tag for ShardTransferAcks
func ShardTransferAcks(shardTransferAcks interface{}) ZapTag {
return NewAnyTag("shard-transfer-acks", shardTransferAcks)
}

// ShardTimerAcks returns tag for ShardTimerAcks
func ShardTimerAcks(shardTimerAcks interface{}) ZapTag {
return NewAnyTag("shard-timer-acks", shardTimerAcks)
// ShardQueueAcks returns tag for shard queue ack levels
func ShardQueueAcks(categoryName string, ackLevel interface{}) ZapTag {
return NewAnyTag("shard-"+categoryName+"-queue-acks", ackLevel)
}

// task queue processor
Expand Down
11 changes: 3 additions & 8 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ import (
"go.temporal.io/server/common/persistence/sql/sqlplugin/mysql"
"go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql"
"go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/resolver"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/environment"
Expand Down Expand Up @@ -238,13 +237,9 @@ func (s *TestBase) Setup(clusterMetadataConfig *cluster.Config) {
s.ReadLevel = 0
s.ReplicationReadLevel = 0
s.ShardInfo = &persistencespb.ShardInfo{
ShardId: shardID,
RangeId: 0,
TransferAckLevel: 0,
ReplicationAckLevel: 0,
TimerAckLevelTime: timestamp.TimePtr(time.Unix(0, 0).UTC()),
ClusterTimerAckLevel: map[string]*time.Time{},
ClusterTransferAckLevel: map[string]int64{clusterName: 0},
ShardId: shardID,
RangeId: 0,
QueueAckLevels: make(map[int32]*persistencespb.QueueAckLevel), // TODO: is this needed?
}

s.TaskIDGenerator = &TestTransferTaskIDGenerator{}
Expand Down
52 changes: 29 additions & 23 deletions common/persistence/persistence-tests/shardPersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
p "go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/tasks"
)

type (
Expand Down Expand Up @@ -114,11 +115,18 @@ func (s *ShardPersistenceSuite) TestUpdateShard() {
updatedInfo := copyShardInfo(shardInfo)
updatedInfo.Owner = updatedOwner
updatedInfo.RangeId = updatedRangeID
updatedInfo.TransferAckLevel = updatedTransferAckLevel
updatedInfo.ReplicationAckLevel = updatedReplicationAckLevel
updatedInfo.QueueAckLevels = make(map[int32]*persistencespb.QueueAckLevel)
updatedInfo.QueueAckLevels[tasks.CategoryTransfer.ID()] = &persistencespb.QueueAckLevel{
AckLevel: updatedTransferAckLevel,
}
updatedInfo.QueueAckLevels[tasks.CategoryReplication.ID()] = &persistencespb.QueueAckLevel{
AckLevel: updatedReplicationAckLevel,
}
updatedInfo.StolenSinceRenew = updatedStolenSinceRenew
updatedTimerAckLevel := timestamp.TimeNowPtrUtc()
updatedInfo.TimerAckLevelTime = updatedTimerAckLevel
updatedTimerAckLevel := time.Now().UTC()
updatedInfo.QueueAckLevels[tasks.CategoryTimer.ID()] = &persistencespb.QueueAckLevel{
AckLevel: updatedTimerAckLevel.UnixNano(),
}
err2 := s.UpdateShard(s.ctx, updatedInfo, shardInfo.GetRangeId())
s.Nil(err2)

Expand All @@ -127,16 +135,20 @@ func (s *ShardPersistenceSuite) TestUpdateShard() {
s.NotNil(info1)
s.Equal(updatedOwner, info1.Owner)
s.Equal(updatedRangeID, info1.GetRangeId())
s.Equal(updatedTransferAckLevel, info1.TransferAckLevel)
s.Equal(updatedReplicationAckLevel, info1.ReplicationAckLevel)
s.Equal(updatedTransferAckLevel, info1.QueueAckLevels[tasks.CategoryTransfer.ID()].AckLevel)
s.Equal(updatedReplicationAckLevel, info1.QueueAckLevels[tasks.CategoryReplication.ID()].AckLevel)
s.Equal(updatedStolenSinceRenew, info1.StolenSinceRenew)
info1timerAckLevelTime := info1.TimerAckLevelTime
s.EqualTimes(*updatedTimerAckLevel, *info1timerAckLevelTime)
s.EqualTimes(updatedTimerAckLevel, timestamp.UnixOrZeroTime(info1.QueueAckLevels[tasks.CategoryTimer.ID()].AckLevel))

failedUpdateInfo := copyShardInfo(shardInfo)
failedUpdateInfo.Owner = "failed_owner"
failedUpdateInfo.TransferAckLevel = int64(4000)
failedUpdateInfo.ReplicationAckLevel = int64(5000)
failedUpdateInfo.QueueAckLevels = make(map[int32]*persistencespb.QueueAckLevel)
failedUpdateInfo.QueueAckLevels[tasks.CategoryTransfer.ID()] = &persistencespb.QueueAckLevel{
AckLevel: int64(4000),
}
failedUpdateInfo.QueueAckLevels[tasks.CategoryReplication.ID()] = &persistencespb.QueueAckLevel{
AckLevel: int64(5000),
}
err4 := s.UpdateShard(s.ctx, failedUpdateInfo, shardInfo.GetRangeId())
s.NotNil(err4)
s.IsType(&p.ShardOwnershipLostError{}, err4)
Expand All @@ -146,23 +158,17 @@ func (s *ShardPersistenceSuite) TestUpdateShard() {
s.NotNil(info2)
s.Equal(updatedOwner, info2.Owner)
s.Equal(updatedRangeID, info2.GetRangeId())
s.Equal(updatedTransferAckLevel, info2.TransferAckLevel)
s.Equal(updatedReplicationAckLevel, info2.ReplicationAckLevel)
s.Equal(updatedTransferAckLevel, info2.QueueAckLevels[tasks.CategoryTransfer.ID()].AckLevel)
s.Equal(updatedReplicationAckLevel, info2.QueueAckLevels[tasks.CategoryReplication.ID()].AckLevel)
s.Equal(updatedStolenSinceRenew, info2.StolenSinceRenew)

info1timerAckLevelTime = info1.TimerAckLevelTime
s.EqualTimes(*updatedTimerAckLevel, *info1timerAckLevelTime)
}

func copyShardInfo(sourceInfo *persistencespb.ShardInfo) *persistencespb.ShardInfo {
return &persistencespb.ShardInfo{
ShardId: sourceInfo.GetShardId(),
Owner: sourceInfo.Owner,
RangeId: sourceInfo.GetRangeId(),
TransferAckLevel: sourceInfo.TransferAckLevel,
ReplicationAckLevel: sourceInfo.ReplicationAckLevel,
StolenSinceRenew: sourceInfo.StolenSinceRenew,
TimerAckLevelTime: sourceInfo.TimerAckLevelTime,
VisibilityAckLevel: sourceInfo.VisibilityAckLevel,
ShardId: sourceInfo.GetShardId(),
Owner: sourceInfo.Owner,
RangeId: sourceInfo.GetRangeId(),
StolenSinceRenew: sourceInfo.StolenSinceRenew,
QueueAckLevels: sourceInfo.QueueAckLevels,
}
}
17 changes: 0 additions & 17 deletions common/persistence/serialization/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ package serialization
import (
"fmt"
"reflect"
"time"

"github.com/gogo/protobuf/proto"
commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -293,22 +292,6 @@ func (t *serializerImpl) ShardInfoFromBlob(data *commonpb.DataBlob, clusterName
return nil, err
}

if len(shardInfo.GetClusterTransferAckLevel()) == 0 {
shardInfo.ClusterTransferAckLevel = map[string]int64{
clusterName: shardInfo.GetTransferAckLevel(),
}
}

if len(shardInfo.GetClusterTimerAckLevel()) == 0 {
shardInfo.ClusterTimerAckLevel = map[string]*time.Time{
clusterName: shardInfo.GetTimerAckLevelTime(),
}
}

if shardInfo.GetClusterReplicationLevel() == nil {
shardInfo.ClusterReplicationLevel = make(map[string]int64)
}

if shardInfo.GetReplicationDlqAckLevel() == nil {
shardInfo.ReplicationDlqAckLevel = make(map[string]int64)
}
Expand Down
21 changes: 14 additions & 7 deletions proto/internal/temporal/server/api/persistence/v1/executions.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,26 @@ message ShardInfo {
int32 shard_id = 1;
int64 range_id = 2;
string owner = 3;
int64 replication_ack_level = 4 [deprecated = true];
int64 transfer_ack_level = 5 [deprecated = true];
// int64 replication_ack_level = 4 [deprecated = true];
// int64 transfer_ack_level = 5 [deprecated = true];
reserved 4;
reserved 5;
// (-- api-linter: core::0140::prepositions=disabled
// aip.dev/not-precedent: "since" is needed here. --)
int32 stolen_since_renew = 6;
google.protobuf.Timestamp update_time = 7 [(gogoproto.stdtime) = true];
google.protobuf.Timestamp timer_ack_level_time = 8 [(gogoproto.stdtime) = true, deprecated = true];
// google.protobuf.Timestamp timer_ack_level_time = 8 [(gogoproto.stdtime) = true, deprecated = true];
reserved 8;
int64 namespace_notification_version = 9;
map<string, int64> cluster_transfer_ack_level = 10 [deprecated = true];
map<string, google.protobuf.Timestamp> cluster_timer_ack_level = 11 [(gogoproto.stdtime) = true, deprecated = true];
map<string, int64> cluster_replication_level = 12 [deprecated = true];
// map<string, int64> cluster_transfer_ack_level = 10 [deprecated = true];
// map<string, google.protobuf.Timestamp> cluster_timer_ack_level = 11 [(gogoproto.stdtime) = true, deprecated = true];
// map<string, int64> cluster_replication_level = 12 [deprecated = true];
reserved 10;
reserved 11;
reserved 12;
map<string, int64> replication_dlq_ack_level = 13;
int64 visibility_ack_level = 14 [deprecated = true];
// int64 visibility_ack_level = 14 [deprecated = true];
reserved 14;
reserved 15;
// Map from task category to ack levels of the corresponding queue processor
map<int32, QueueAckLevel> queue_ack_levels = 16;
Expand Down
5 changes: 2 additions & 3 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,8 @@ func (s *engine2Suite) SetupTest() {
s.controller,
&persistence.ShardInfoWithFailover{
ShardInfo: &persistencespb.ShardInfo{
ShardId: 1,
RangeId: 1,
TransferAckLevel: 0,
ShardId: 1,
RangeId: 1,
}},
s.config,
)
Expand Down
5 changes: 2 additions & 3 deletions service/history/historyEngine3_eventsv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,8 @@ func (s *engine3Suite) SetupTest() {
s.mockShard = shard.NewTestContext(
s.controller,
&p.ShardInfoWithFailover{ShardInfo: &persistencespb.ShardInfo{
ShardId: 1,
RangeId: 1,
TransferAckLevel: 0,
ShardId: 1,
RangeId: 1,
}},
s.config,
)
Expand Down
5 changes: 2 additions & 3 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,8 @@ func (s *engineSuite) SetupTest() {
s.controller,
&persistence.ShardInfoWithFailover{
ShardInfo: &persistencespb.ShardInfo{
ShardId: 1,
RangeId: 1,
TransferAckLevel: 0,
ShardId: 1,
RangeId: 1,
}},
s.config,
)
Expand Down
5 changes: 2 additions & 3 deletions service/history/nDCActivityReplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,8 @@ func (s *activityReplicatorSuite) SetupTest() {
s.controller,
&persistence.ShardInfoWithFailover{
ShardInfo: &persistencespb.ShardInfo{
ShardId: 1,
RangeId: 1,
TransferAckLevel: 0,
ShardId: 1,
RangeId: 1,
}},
tests.NewDynamicConfig(),
)
Expand Down
5 changes: 2 additions & 3 deletions service/history/nDCBranchMgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,8 @@ func (s *nDCBranchMgrSuite) SetupTest() {
s.controller,
&persistence.ShardInfoWithFailover{
ShardInfo: &persistencespb.ShardInfo{
ShardId: 10,
RangeId: 1,
TransferAckLevel: 0,
ShardId: 10,
RangeId: 1,
}},
tests.NewDynamicConfig(),
)
Expand Down
5 changes: 2 additions & 3 deletions service/history/nDCConflictResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@ func (s *nDCConflictResolverSuite) SetupTest() {
s.controller,
&persistence.ShardInfoWithFailover{
ShardInfo: &persistencespb.ShardInfo{
ShardId: 10,
RangeId: 1,
TransferAckLevel: 0,
ShardId: 10,
RangeId: 1,
}},
tests.NewDynamicConfig(),
)
Expand Down
5 changes: 2 additions & 3 deletions service/history/nDCHistoryReplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ func (s *nDCHistoryReplicatorSuite) SetupTest() {
s.controller,
&persistence.ShardInfoWithFailover{
ShardInfo: &persistencespb.ShardInfo{
ShardId: 10,
RangeId: 1,
TransferAckLevel: 0,
ShardId: 10,
RangeId: 1,
}},
tests.NewDynamicConfig(),
)
Expand Down
5 changes: 2 additions & 3 deletions service/history/nDCStateRebuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,8 @@ func (s *nDCStateRebuilderSuite) SetupTest() {
s.controller,
&persistence.ShardInfoWithFailover{
ShardInfo: &persistencespb.ShardInfo{
ShardId: 10,
RangeId: 1,
TransferAckLevel: 0,
ShardId: 10,
RangeId: 1,
}},
tests.NewDynamicConfig(),
)
Expand Down
5 changes: 2 additions & 3 deletions service/history/nDCTransactionMgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ func (s *nDCTransactionMgrSuite) SetupTest() {
s.controller,
&persistence.ShardInfoWithFailover{
ShardInfo: &persistencespb.ShardInfo{
ShardId: 10,
RangeId: 1,
TransferAckLevel: 0,
ShardId: 10,
RangeId: 1,
}},
tests.NewDynamicConfig(),
)
Expand Down
5 changes: 2 additions & 3 deletions service/history/nDCWorkflowResetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,8 @@ func (s *nDCWorkflowResetterSuite) SetupTest() {
s.controller,
&persistence.ShardInfoWithFailover{
ShardInfo: &persistencespb.ShardInfo{
ShardId: 10,
RangeId: 1,
TransferAckLevel: 0,
ShardId: 10,
RangeId: 1,
}},
tests.NewDynamicConfig(),
)
Expand Down
26 changes: 18 additions & 8 deletions service/history/queueAckMgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,15 @@ func (s *queueAckMgrSuite) SetupTest() {
ShardInfo: &persistencespb.ShardInfo{
ShardId: 1,
RangeId: 1,
ClusterTimerAckLevel: map[string]*time.Time{
cluster.TestCurrentClusterName: timestamp.TimeNowPtrUtcAddSeconds(-8),
cluster.TestAlternativeClusterName: timestamp.TimeNowPtrUtcAddSeconds(-10),
}},
QueueAckLevels: map[int32]*persistencespb.QueueAckLevel{
tasks.CategoryTimer.ID(): {
ClusterAckLevel: map[string]int64{
cluster.TestCurrentClusterName: timestamp.TimeNowPtrUtcAddSeconds(-8).UnixNano(),
cluster.TestAlternativeClusterName: timestamp.TimeNowPtrUtcAddSeconds(-10).UnixNano(),
},
},
},
},
},
config,
)
Expand Down Expand Up @@ -334,10 +339,15 @@ func (s *queueFailoverAckMgrSuite) SetupTest() {
ShardInfo: &persistencespb.ShardInfo{
ShardId: 1,
RangeId: 1,
ClusterTimerAckLevel: map[string]*time.Time{
cluster.TestCurrentClusterName: timestamp.TimeNowPtrUtc(),
cluster.TestAlternativeClusterName: timestamp.TimeNowPtrUtcAddSeconds(-10),
}},
QueueAckLevels: map[int32]*persistencespb.QueueAckLevel{
tasks.CategoryTimer.ID(): {
ClusterAckLevel: map[string]int64{
cluster.TestCurrentClusterName: timestamp.TimeNowPtrUtc().UnixNano(),
cluster.TestAlternativeClusterName: timestamp.TimeNowPtrUtcAddSeconds(-10).UnixNano(),
},
},
},
},
},
config,
)
Expand Down
5 changes: 2 additions & 3 deletions service/history/replication/ack_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,8 @@ func (s *ackManagerSuite) SetupTest() {
s.controller,
&persistence.ShardInfoWithFailover{
ShardInfo: &persistencespb.ShardInfo{
ShardId: 0,
RangeId: 1,
TransferAckLevel: 0,
ShardId: 0,
RangeId: 1,
}},
tests.NewDynamicConfig(),
)
Expand Down
1 change: 0 additions & 1 deletion service/history/replication/dlq_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ func (s *dlqHandlerSuite) SetupTest() {
ShardInfo: &persistencespb.ShardInfo{
ShardId: 0,
RangeId: 1,
ReplicationAckLevel: 0,
ReplicationDlqAckLevel: map[string]int64{cluster.TestAlternativeClusterName: persistence.EmptyQueueMessageID},
}},
tests.NewDynamicConfig(),
Expand Down
Loading

0 comments on commit 6b2b3f2

Please sign in to comment.