Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update interfaces and structs for admin use #2533

Merged
merged 2 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 75 additions & 75 deletions common/persistence/serialization/serializer.go

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func newQueueProcessorBase(

var taskProcessor *taskProcessor
if !options.EnablePriorityTaskProcessor() {
taskProcessorOptions := taskProcessorOptions{
queueSize: options.BatchSize(),
workerCount: options.WorkerCount(),
taskProcessorOptions := TaskProcessorOptions{
QueueSize: options.BatchSize(),
WorkerCount: options.WorkerCount(),
}
taskProcessor = newTaskProcessor(taskProcessorOptions, shard, historyCache, logger)
taskProcessor = NewTaskProcessor(taskProcessorOptions, shard, historyCache, logger)
}

p := &queueProcessorBase{
Expand Down Expand Up @@ -141,7 +141,7 @@ func (p *queueProcessorBase) Start() {
defer p.logger.Info("", tag.LifeCycleStarted, tag.ComponentTransferQueue)

if p.taskProcessor != nil {
p.taskProcessor.start()
p.taskProcessor.Start()
}
p.shutdownWG.Add(1)
p.notifyNewTask()
Expand All @@ -164,7 +164,7 @@ func (p *queueProcessorBase) Stop() {
}

if p.taskProcessor != nil {
p.taskProcessor.stop()
p.taskProcessor.Stop()
}
}

Expand Down Expand Up @@ -280,7 +280,7 @@ func (p *queueProcessorBase) submitTask(
) bool {

return p.taskProcessor.addTask(
newTaskInfo(
NewTaskInfo(
p.processor,
taskInfo,
initializeLoggerForTask(p.shard.GetShardID(), taskInfo, p.logger),
Expand Down
22 changes: 11 additions & 11 deletions service/history/taskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ const (
)

type (
taskProcessorOptions struct {
queueSize int
workerCount int
TaskProcessorOptions struct {
QueueSize int
WorkerCount int
}

taskInfo struct {
Expand Down Expand Up @@ -92,7 +92,7 @@ type (
}
)

func newTaskInfo(
func NewTaskInfo(
processor taskExecutor,
task tasks.Task,
logger log.Logger,
Expand All @@ -109,36 +109,36 @@ func newTaskInfo(
}
}

func newTaskProcessor(
options taskProcessorOptions,
func NewTaskProcessor(
options TaskProcessorOptions,
shard shard.Context,
historyCache workflow.Cache,
logger log.Logger,
) *taskProcessor {

var workerNotificationChs []chan struct{}
for index := 0; index < options.workerCount; index++ {
for index := 0; index < options.WorkerCount; index++ {
workerNotificationChs = append(workerNotificationChs, make(chan struct{}, 1))
}

base := &taskProcessor{
shard: shard,
cache: historyCache,
shutdownCh: make(chan struct{}),
tasksCh: make(chan *taskInfo, options.queueSize),
tasksCh: make(chan *taskInfo, options.QueueSize),
config: shard.GetConfig(),
logger: logger,
metricsClient: shard.GetMetricsClient(),
timeSource: shard.GetTimeSource(),
workerNotificationChans: workerNotificationChs,
retryPolicy: common.CreatePersistenceRetryPolicy(),
numOfWorker: options.workerCount,
numOfWorker: options.WorkerCount,
}

return base
}

func (t *taskProcessor) start() {
func (t *taskProcessor) Start() {
for i := 0; i < t.numOfWorker; i++ {
t.workerWG.Add(1)
notificationChan := t.workerNotificationChans[i]
Expand All @@ -147,7 +147,7 @@ func (t *taskProcessor) start() {
t.logger.Info("Task processor started.")
}

func (t *taskProcessor) stop() {
func (t *taskProcessor) Stop() {
close(t.shutdownCh)
if success := common.AwaitWaitGroup(&t.workerWG, time.Minute); !success {
t.logger.Warn("Task processor timed out on shutdown.")
Expand Down
30 changes: 15 additions & 15 deletions service/history/taskProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ func (s *taskProcessorSuite) SetupTest() {
logger: s.logger,
metricsClient: s.mockShard.GetMetricsClient(),
}
options := taskProcessorOptions{
queueSize: s.mockShard.GetConfig().TimerTaskBatchSize() * s.mockShard.GetConfig().TimerTaskWorkerCount(),
workerCount: s.mockShard.GetConfig().TimerTaskWorkerCount(),
options := TaskProcessorOptions{
QueueSize: s.mockShard.GetConfig().TimerTaskBatchSize() * s.mockShard.GetConfig().TimerTaskWorkerCount(),
WorkerCount: s.mockShard.GetConfig().TimerTaskWorkerCount(),
}
s.taskProcessor = newTaskProcessor(options, s.mockShard, h.historyCache, s.logger)
s.taskProcessor = NewTaskProcessor(options, s.mockShard, h.historyCache, s.logger)
}

func (s *taskProcessorSuite) TearDownTest() {
Expand All @@ -138,7 +138,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_ShutDown() {
}

func (s *taskProcessorSuite) TestProcessTaskAndAck_NamespaceErrRetry_ProcessNoErr() {
task := newTaskInfo(s.mockProcessor, &taskForTest{Key: tasks.Key{TaskID: 12345, FireTime: time.Now().UTC()}}, s.logger)
task := NewTaskInfo(s.mockProcessor, &taskForTest{Key: tasks.Key{TaskID: 12345, FireTime: time.Now().UTC()}}, s.logger)
var taskFilterErr taskFilter = func(task tasks.Task) (bool, error) {
return false, errors.New("some random error")
}
Expand All @@ -157,7 +157,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_NamespaceErrRetry_ProcessNoEr
}

func (s *taskProcessorSuite) TestProcessTaskAndAck_NamespaceFalse_ProcessNoErr() {
task := newTaskInfo(s.mockProcessor, &taskForTest{Key: tasks.Key{TaskID: 12345, FireTime: time.Now().UTC()}}, s.logger)
task := NewTaskInfo(s.mockProcessor, &taskForTest{Key: tasks.Key{TaskID: 12345, FireTime: time.Now().UTC()}}, s.logger)
task.shouldProcessTask = false
var taskFilter taskFilter = func(task tasks.Task) (bool, error) {
return false, nil
Expand All @@ -173,7 +173,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_NamespaceFalse_ProcessNoErr()
}

func (s *taskProcessorSuite) TestProcessTaskAndAck_NamespaceTrue_ProcessNoErr() {
task := newTaskInfo(s.mockProcessor, &taskForTest{Key: tasks.Key{TaskID: 12345, FireTime: time.Now().UTC()}}, s.logger)
task := NewTaskInfo(s.mockProcessor, &taskForTest{Key: tasks.Key{TaskID: 12345, FireTime: time.Now().UTC()}}, s.logger)
var taskFilter taskFilter = func(task tasks.Task) (bool, error) {
return true, nil
}
Expand All @@ -190,7 +190,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_NamespaceTrue_ProcessNoErr()

func (s *taskProcessorSuite) TestProcessTaskAndAck_NamespaceTrue_ProcessErrNoErr() {
err := errors.New("some random err")
task := newTaskInfo(s.mockProcessor, &taskForTest{Key: tasks.Key{TaskID: 12345, FireTime: time.Now().UTC()}}, s.logger)
task := NewTaskInfo(s.mockProcessor, &taskForTest{Key: tasks.Key{TaskID: 12345, FireTime: time.Now().UTC()}}, s.logger)
var taskFilter taskFilter = func(task tasks.Task) (bool, error) {
return true, nil
}
Expand All @@ -208,15 +208,15 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_NamespaceTrue_ProcessErrNoErr
func (s *taskProcessorSuite) TestHandleTaskError_EntityNotExists() {
err := serviceerror.NewNotFound("")

taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger)
taskInfo := NewTaskInfo(s.mockProcessor, nil, s.logger)
s.Nil(s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err))
}

func (s *taskProcessorSuite) TestHandleTaskError_ErrTaskRetry() {
err := consts.ErrTaskRetry
delay := time.Second

taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger)
taskInfo := NewTaskInfo(s.mockProcessor, nil, s.logger)
go func() {
time.Sleep(delay)
s.notificationChan <- struct{}{}
Expand All @@ -231,14 +231,14 @@ func (s *taskProcessorSuite) TestHandleTaskError_ErrTaskRetry() {
func (s *taskProcessorSuite) TestHandleTaskError_ErrTaskDiscarded() {
err := consts.ErrTaskDiscarded

taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger)
taskInfo := NewTaskInfo(s.mockProcessor, nil, s.logger)
s.Nil(s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err))
}

func (s *taskProcessorSuite) TestHandleTaskError_NamespaceNotActiveError() {
err := serviceerror.NewNamespaceNotActive("", "", "")

taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger)
taskInfo := NewTaskInfo(s.mockProcessor, nil, s.logger)
taskInfo.startTime = time.Now().UTC().Add(-namespace.CacheRefreshInterval * time.Duration(3))
s.Nil(s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err))

Expand All @@ -249,19 +249,19 @@ func (s *taskProcessorSuite) TestHandleTaskError_NamespaceNotActiveError() {
func (s *taskProcessorSuite) TestHandleTaskError_CurrentWorkflowConditionFailedError() {
err := &persistence.CurrentWorkflowConditionFailedError{}

taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger)
taskInfo := NewTaskInfo(s.mockProcessor, nil, s.logger)
s.Nil(s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err))
}

func (s *taskProcessorSuite) TestHandleTaskError_RandomErr() {
err := errors.New("random error")

taskInfo := newTaskInfo(s.mockProcessor, nil, s.logger)
taskInfo := NewTaskInfo(s.mockProcessor, nil, s.logger)
s.Equal(err, s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err))
}

func (s *taskProcessorSuite) TestProcessTaskAndAck_SetsUserLatencyCorrectly() {
task := newTaskInfo(s.mockProcessor, &taskForTest{Key: tasks.Key{TaskID: 12345, FireTime: time.Now().UTC()}}, s.logger)
task := NewTaskInfo(s.mockProcessor, &taskForTest{Key: tasks.Key{TaskID: 12345, FireTime: time.Now().UTC()}}, s.logger)
task.shouldProcessTask = false
var taskFilter taskFilter = func(task tasks.Task) (bool, error) {
return false, nil
Expand Down
6 changes: 3 additions & 3 deletions service/history/timerQueueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,17 @@ func (t timerKeys) Swap(i, j int) {

// Less implements sort.Interface
func (t timerKeys) Less(i, j int) bool {
return compareTimerIDLess(&t[i], &t[j])
return CompareTimerIDLess(&t[i], &t[j])
}

func newTimerKey(time time.Time, taskID int64) *timerKey {
func NewTimerKey(time time.Time, taskID int64) *timerKey {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we deprecate the TimerKey and use tasks.Key instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Good to know. I will update this.

return &timerKey{
VisibilityTimestamp: time,
TaskID: taskID,
}
}

func compareTimerIDLess(first *timerKey, second *timerKey) bool {
func CompareTimerIDLess(first *timerKey, second *timerKey) bool {
if first.VisibilityTimestamp.Before(second.VisibilityTimestamp) {
return true
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (t *timerQueueProcessorImpl) completeTimers() error {
t.standbyTimerProcessorsLock.RLock()
for _, standbyTimerProcessor := range t.standbyTimerProcessors {
ackLevel := standbyTimerProcessor.getAckLevel()
if !compareTimerIDLess(&upperAckLevel, &ackLevel) {
if !CompareTimerIDLess(&upperAckLevel, &ackLevel) {
upperAckLevel = ackLevel
}
}
Expand All @@ -309,7 +309,7 @@ func (t *timerQueueProcessorImpl) completeTimers() error {
}

t.logger.Debug("Start completing timer task", tag.AckLevel(lowerAckLevel), tag.AckLevel(upperAckLevel))
if !compareTimerIDLess(&lowerAckLevel, &upperAckLevel) {
if !CompareTimerIDLess(&lowerAckLevel, &upperAckLevel) {
return nil
}

Expand Down
14 changes: 7 additions & 7 deletions service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ func newTimerQueueProcessorBase(

var taskProcessor *taskProcessor
if !config.TimerProcessorEnablePriorityTaskProcessor() {
options := taskProcessorOptions{
workerCount: config.TimerTaskWorkerCount(),
queueSize: config.TimerTaskWorkerCount() * config.TimerTaskBatchSize(),
options := TaskProcessorOptions{
WorkerCount: config.TimerTaskWorkerCount(),
QueueSize: config.TimerTaskWorkerCount() * config.TimerTaskBatchSize(),
}
taskProcessor = newTaskProcessor(options, shard, workflowCache, logger)
taskProcessor = NewTaskProcessor(options, shard, workflowCache, logger)
}

base := &timerQueueProcessorBase{
Expand Down Expand Up @@ -141,7 +141,7 @@ func (t *timerQueueProcessorBase) Start() {
}

if t.taskProcessor != nil {
t.taskProcessor.start()
t.taskProcessor.Start()
}
t.shutdownWG.Add(1)
// notify a initial scan
Expand All @@ -165,7 +165,7 @@ func (t *timerQueueProcessorBase) Stop() {
}

if t.taskProcessor != nil {
t.taskProcessor.stop()
t.taskProcessor.Stop()
}
t.logger.Info("Timer queue processor stopped.")
}
Expand Down Expand Up @@ -346,7 +346,7 @@ func (t *timerQueueProcessorBase) submitTask(
) bool {

return t.taskProcessor.addTask(
newTaskInfo(
NewTaskInfo(
t.timerProcessor,
taskInfo,
initializeLoggerForTask(t.shard.GetShardID(), taskInfo, t.logger),
Expand Down