From bb7b984ef5e2866037cd47e6d377a412780ecbb3 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 15 Mar 2022 10:42:12 -0700 Subject: [PATCH] Make matching persistence range queries [inclusive, exclusive) (#2599) - Range for matching GetTasks is now [inclusive, exclusive), instead of (exclusive, inclusive] - Range for matching CompleteTasksLessThan is now < max TaskID, instead of <= - Range for admin ListTaskQueueTasks is now [inclusive, exclusive) --- .../cassandra/matching_task_store.go | 14 ++-- common/persistence/dataInterfaces.go | 14 ++-- .../persistence-tests/persistenceTestBase.go | 29 --------- .../sql/sqlplugin/matching_task.go | 21 +++--- .../persistence/sql/sqlplugin/mysql/task.go | 18 +++--- .../sql/sqlplugin/postgresql/task.go | 18 +++--- .../persistence/sql/sqlplugin/sqlite/task.go | 18 +++--- .../sql/sqlplugin/tests/matching_task_test.go | 64 +++++++++---------- common/persistence/sql/task.go | 41 ++++++------ common/persistence/task_manager.go | 2 +- common/persistence/tests/task_queue_task.go | 32 +++++----- service/frontend/adminHandler.go | 5 +- service/matching/db.go | 24 ++++--- service/matching/db_task_manager.go | 10 +-- service/matching/db_task_manager_test.go | 20 +++--- service/matching/db_task_reader.go | 4 +- service/matching/db_task_reader_test.go | 40 ++++++------ service/matching/matchingEngine_test.go | 8 +-- service/matching/taskGC.go | 2 +- service/matching/taskReader.go | 2 +- service/worker/scanner/taskqueue/db.go | 16 ++--- service/worker/scanner/taskqueue/handler.go | 4 +- .../worker/scanner/taskqueue/mocks_test.go | 2 +- .../scanner/taskqueue/scavenger_test.go | 2 +- tools/cli/admin.go | 4 +- 25 files changed, 193 insertions(+), 221 deletions(-) diff --git a/common/persistence/cassandra/matching_task_store.go b/common/persistence/cassandra/matching_task_store.go index c7836a17f36..92d91ca4552 100644 --- a/common/persistence/cassandra/matching_task_store.go +++ b/common/persistence/cassandra/matching_task_store.go @@ -54,8 +54,8 @@ const ( `and task_queue_name = ? ` + `and task_queue_type = ? ` + `and type = ? ` + - `and task_id > ? ` + - `and task_id <= ?` + `and task_id >= ? ` + + `and task_id < ?` templateCompleteTaskQuery = `DELETE FROM tasks ` + `WHERE namespace_id = ? ` + @@ -69,7 +69,7 @@ const ( `AND task_queue_name = ? ` + `AND task_queue_type = ? ` + `AND type = ? ` + - `AND task_id <= ? ` + `AND task_id < ? ` templateGetTaskQueueQuery = `SELECT ` + `range_id, ` + @@ -388,8 +388,8 @@ func (d *MatchingTaskStore) GetTasks( request.TaskQueue, request.TaskType, rowTypeTask, - request.MinTaskIDExclusive, - request.MaxTaskIDInclusive, + request.InclusiveMinTaskID, + request.ExclusiveMaxTaskID, ) iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter() @@ -455,14 +455,14 @@ func (d *MatchingTaskStore) CompleteTask( return nil } -// CompleteTasksLessThan deletes all tasks less than or equal to the given task id. This API ignores the +// CompleteTasksLessThan deletes all tasks less than the given task id. This API ignores the // Limit request parameter i.e. either all tasks leq the task_id will be deleted or an error will // be returned to the caller func (d *MatchingTaskStore) CompleteTasksLessThan( request *p.CompleteTasksLessThanRequest, ) (int, error) { query := d.Session.Query(templateCompleteTasksLessThanQuery, - request.NamespaceID, request.TaskQueueName, request.TaskType, rowTypeTask, request.TaskID) + request.NamespaceID, request.TaskQueueName, request.TaskType, rowTypeTask, request.ExclusiveMaxTaskID) err := query.Exec() if err != nil { return 0, gocql.ConvertError("CompleteTasksLessThan", err) diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 10e5625bb02..f618e1b9830 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -565,8 +565,8 @@ type ( NamespaceID string TaskQueue string TaskType enumspb.TaskQueueType - MinTaskIDExclusive int64 // exclusive - MaxTaskIDInclusive int64 // inclusive + InclusiveMinTaskID int64 + ExclusiveMaxTaskID int64 PageSize int NextPageToken []byte } @@ -585,11 +585,11 @@ type ( // CompleteTasksLessThanRequest contains the request params needed to invoke CompleteTasksLessThan API CompleteTasksLessThanRequest struct { - NamespaceID string - TaskQueueName string - TaskType enumspb.TaskQueueType - TaskID int64 // Tasks less than or equal to this ID will be completed - Limit int // Limit on the max number of tasks that can be completed. Required param + NamespaceID string + TaskQueueName string + TaskType enumspb.TaskQueueType + ExclusiveMaxTaskID int64 // Tasks less than this ID will be completed + Limit int // Limit on the max number of tasks that can be completed. Required param } // CreateNamespaceRequest is used to create the namespace diff --git a/common/persistence/persistence-tests/persistenceTestBase.go b/common/persistence/persistence-tests/persistenceTestBase.go index f486f04cee0..868c7ecd5c5 100644 --- a/common/persistence/persistence-tests/persistenceTestBase.go +++ b/common/persistence/persistence-tests/persistenceTestBase.go @@ -1241,35 +1241,6 @@ func (s *TestBase) RangeCompleteTimerTask(inclusiveBeginTimestamp time.Time, exc }) } -// GetTasks is a utility method to get tasks from persistence -func (s *TestBase) GetTasks(namespaceID string, taskQueue string, taskType enumspb.TaskQueueType, batchSize int) (*persistence.GetTasksResponse, error) { - response, err := s.TaskMgr.GetTasks(&persistence.GetTasksRequest{ - NamespaceID: namespaceID, - TaskQueue: taskQueue, - TaskType: taskType, - PageSize: batchSize, - MaxTaskIDInclusive: math.MaxInt64, - }) - - if err != nil { - return nil, err - } - - return &persistence.GetTasksResponse{Tasks: response.Tasks}, nil -} - -// CompleteTask is a utility method to complete a task -func (s *TestBase) CompleteTask(namespaceID string, taskQueue string, taskType enumspb.TaskQueueType, taskID int64) error { - return s.TaskMgr.CompleteTask(&persistence.CompleteTaskRequest{ - TaskQueue: &persistence.TaskQueueKey{ - NamespaceID: namespaceID, - TaskQueueType: taskType, - TaskQueueName: taskQueue, - }, - TaskID: taskID, - }) -} - // TearDownWorkflowStore to cleanup func (s *TestBase) TearDownWorkflowStore() { s.TaskMgr.Close() diff --git a/common/persistence/sql/sqlplugin/matching_task.go b/common/persistence/sql/sqlplugin/matching_task.go index 163ec15538d..247abbd507a 100644 --- a/common/persistence/sql/sqlplugin/matching_task.go +++ b/common/persistence/sql/sqlplugin/matching_task.go @@ -42,29 +42,28 @@ type ( // TasksFilter contains the column names within tasks table that // can be used to filter results through a WHERE clause TasksFilter struct { - RangeHash uint32 - TaskQueueID []byte - TaskID *int64 - MinTaskID *int64 - MaxTaskID *int64 - TaskIDLessThanEquals *int64 - Limit *int - PageSize *int + RangeHash uint32 + TaskQueueID []byte + TaskID *int64 + InclusiveMinTaskID *int64 + ExclusiveMaxTaskID *int64 + Limit *int + PageSize *int } // MatchingTask is the SQL persistence interface for matching tasks MatchingTask interface { InsertIntoTasks(ctx context.Context, rows []TasksRow) (sql.Result, error) // SelectFromTasks retrieves one or more rows from the tasks table - // Required filter params - {namespaceID, taskqueueName, taskType, minTaskID, maxTaskID, pageSize} + // Required filter params - {namespaceID, taskqueueName, taskType, inclusiveMinTaskID, exclusiveMaxTaskID, pageSize} SelectFromTasks(ctx context.Context, filter TasksFilter) ([]TasksRow, error) // DeleteFromTasks deletes a row from tasks table // Required filter params: // to delete single row // - {namespaceID, taskqueueName, taskType, taskID} // to delete multiple rows - // - {namespaceID, taskqueueName, taskType, taskIDLessThanEquals, limit } - // - this will delete upto limit number of tasks less than or equal to the given task id + // - {namespaceID, taskqueueName, taskType, exclusiveMaxTaskID, limit } + // - this will delete upto limit number of tasks less than the given max task id DeleteFromTasks(ctx context.Context, filter TasksFilter) (sql.Result, error) } ) diff --git a/common/persistence/sql/sqlplugin/mysql/task.go b/common/persistence/sql/sqlplugin/mysql/task.go index 34683dc83db..49ba4e8489d 100644 --- a/common/persistence/sql/sqlplugin/mysql/task.go +++ b/common/persistence/sql/sqlplugin/mysql/task.go @@ -70,12 +70,12 @@ task_queue_id = :task_queue_id // *** Tasks Below *** getTaskMinMaxQry = `SELECT task_id, data, data_encoding ` + `FROM tasks ` + - `WHERE range_hash = ? AND task_queue_id = ? AND task_id > ? AND task_id <= ? ` + + `WHERE range_hash = ? AND task_queue_id = ? AND task_id >= ? AND task_id < ? ` + ` ORDER BY task_id LIMIT ?` getTaskMinQry = `SELECT task_id, data, data_encoding ` + `FROM tasks ` + - `WHERE range_hash = ? AND task_queue_id = ? AND task_id > ? ORDER BY task_id LIMIT ?` + `WHERE range_hash = ? AND task_queue_id = ? AND task_id >= ? ORDER BY task_id LIMIT ?` createTaskQry = `INSERT INTO ` + `tasks(range_hash, task_queue_id, task_id, data, data_encoding) ` + @@ -85,7 +85,7 @@ task_queue_id = :task_queue_id `WHERE range_hash = ? AND task_queue_id = ? AND task_id = ?` rangeDeleteTaskQry = `DELETE FROM tasks ` + - `WHERE range_hash = ? AND task_queue_id = ? AND task_id <= ? ` + + `WHERE range_hash = ? AND task_queue_id = ? AND task_id < ? ` + `ORDER BY task_queue_id,task_id LIMIT ?` ) @@ -108,13 +108,13 @@ func (mdb *db) SelectFromTasks( var err error var rows []sqlplugin.TasksRow switch { - case filter.MaxTaskID != nil: + case filter.ExclusiveMaxTaskID != nil: err = mdb.conn.SelectContext(ctx, &rows, getTaskMinMaxQry, filter.RangeHash, filter.TaskQueueID, - *filter.MinTaskID, - *filter.MaxTaskID, + *filter.InclusiveMinTaskID, + *filter.ExclusiveMaxTaskID, *filter.PageSize, ) default: @@ -122,7 +122,7 @@ func (mdb *db) SelectFromTasks( &rows, getTaskMinQry, filter.RangeHash, filter.TaskQueueID, - *filter.MinTaskID, + *filter.InclusiveMinTaskID, *filter.PageSize, ) } @@ -137,7 +137,7 @@ func (mdb *db) DeleteFromTasks( ctx context.Context, filter sqlplugin.TasksFilter, ) (sql.Result, error) { - if filter.TaskIDLessThanEquals != nil { + if filter.ExclusiveMaxTaskID != nil { if filter.Limit == nil || *filter.Limit == 0 { return nil, fmt.Errorf("missing limit parameter") } @@ -145,7 +145,7 @@ func (mdb *db) DeleteFromTasks( rangeDeleteTaskQry, filter.RangeHash, filter.TaskQueueID, - *filter.TaskIDLessThanEquals, + *filter.ExclusiveMaxTaskID, *filter.Limit, ) } diff --git a/common/persistence/sql/sqlplugin/postgresql/task.go b/common/persistence/sql/sqlplugin/postgresql/task.go index 05c1a43b57f..3fb11d43d97 100644 --- a/common/persistence/sql/sqlplugin/postgresql/task.go +++ b/common/persistence/sql/sqlplugin/postgresql/task.go @@ -70,12 +70,12 @@ task_queue_id = :task_queue_id // *** Tasks Below *** getTaskMinMaxQry = `SELECT task_id, data, data_encoding ` + `FROM tasks ` + - `WHERE range_hash = $1 AND task_queue_id=$2 AND task_id > $3 AND task_id <= $4 ` + + `WHERE range_hash = $1 AND task_queue_id=$2 AND task_id >= $3 AND task_id < $4 ` + `ORDER BY task_id LIMIT $5` getTaskMinQry = `SELECT task_id, data, data_encoding ` + `FROM tasks ` + - `WHERE range_hash = $1 AND task_queue_id = $2 AND task_id > $3 ORDER BY task_id LIMIT $4` + `WHERE range_hash = $1 AND task_queue_id = $2 AND task_id >= $3 ORDER BY task_id LIMIT $4` createTaskQry = `INSERT INTO ` + `tasks(range_hash, task_queue_id, task_id, data, data_encoding) ` + @@ -86,7 +86,7 @@ task_queue_id = :task_queue_id rangeDeleteTaskQry = `DELETE FROM tasks ` + `WHERE range_hash = $1 AND task_queue_id = $2 AND task_id IN (SELECT task_id FROM - tasks WHERE range_hash = $1 AND task_queue_id = $2 AND task_id <= $3 ` + + tasks WHERE range_hash = $1 AND task_queue_id = $2 AND task_id < $3 ` + `ORDER BY task_queue_id,task_id LIMIT $4 )` ) @@ -109,14 +109,14 @@ func (pdb *db) SelectFromTasks( var err error var rows []sqlplugin.TasksRow switch { - case filter.MaxTaskID != nil: + case filter.ExclusiveMaxTaskID != nil: err = pdb.conn.SelectContext(ctx, &rows, getTaskMinMaxQry, filter.RangeHash, filter.TaskQueueID, - *filter.MinTaskID, - *filter.MaxTaskID, + *filter.InclusiveMinTaskID, + *filter.ExclusiveMaxTaskID, *filter.PageSize, ) default: @@ -125,7 +125,7 @@ func (pdb *db) SelectFromTasks( getTaskMinQry, filter.RangeHash, filter.TaskQueueID, - *filter.MinTaskID, + *filter.InclusiveMinTaskID, *filter.PageSize, ) } @@ -137,7 +137,7 @@ func (pdb *db) DeleteFromTasks( ctx context.Context, filter sqlplugin.TasksFilter, ) (sql.Result, error) { - if filter.TaskIDLessThanEquals != nil { + if filter.ExclusiveMaxTaskID != nil { if filter.Limit == nil || *filter.Limit == 0 { return nil, fmt.Errorf("missing limit parameter") } @@ -145,7 +145,7 @@ func (pdb *db) DeleteFromTasks( rangeDeleteTaskQry, filter.RangeHash, filter.TaskQueueID, - *filter.TaskIDLessThanEquals, + *filter.ExclusiveMaxTaskID, *filter.Limit, ) } diff --git a/common/persistence/sql/sqlplugin/sqlite/task.go b/common/persistence/sql/sqlplugin/sqlite/task.go index 6eaf9e6ef0d..4119f242ef1 100644 --- a/common/persistence/sql/sqlplugin/sqlite/task.go +++ b/common/persistence/sql/sqlplugin/sqlite/task.go @@ -74,12 +74,12 @@ task_queue_id = :task_queue_id // *** Tasks Below *** getTaskMinMaxQry = `SELECT task_id, data, data_encoding ` + `FROM tasks ` + - `WHERE range_hash = ? AND task_queue_id = ? AND task_id > ? AND task_id <= ? ` + + `WHERE range_hash = ? AND task_queue_id = ? AND task_id >= ? AND task_id < ? ` + ` ORDER BY task_id LIMIT ?` getTaskMinQry = `SELECT task_id, data, data_encoding ` + `FROM tasks ` + - `WHERE range_hash = ? AND task_queue_id = ? AND task_id > ? ORDER BY task_id LIMIT ?` + `WHERE range_hash = ? AND task_queue_id = ? AND task_id >= ? ORDER BY task_id LIMIT ?` createTaskQry = `INSERT INTO ` + `tasks(range_hash, task_queue_id, task_id, data, data_encoding) ` + @@ -90,7 +90,7 @@ task_queue_id = :task_queue_id rangeDeleteTaskQry = `DELETE FROM tasks ` + `WHERE range_hash = ? AND task_queue_id = ? AND task_id IN (SELECT task_id FROM - tasks WHERE range_hash = ? AND task_queue_id = ? AND task_id <= ? ` + + tasks WHERE range_hash = ? AND task_queue_id = ? AND task_id < ? ` + `ORDER BY task_queue_id,task_id LIMIT ? ) ` ) @@ -113,13 +113,13 @@ func (mdb *db) SelectFromTasks( var err error var rows []sqlplugin.TasksRow switch { - case filter.MaxTaskID != nil: + case filter.ExclusiveMaxTaskID != nil: err = mdb.conn.SelectContext(ctx, &rows, getTaskMinMaxQry, filter.RangeHash, filter.TaskQueueID, - *filter.MinTaskID, - *filter.MaxTaskID, + *filter.InclusiveMinTaskID, + *filter.ExclusiveMaxTaskID, *filter.PageSize, ) default: @@ -127,7 +127,7 @@ func (mdb *db) SelectFromTasks( &rows, getTaskMinQry, filter.RangeHash, filter.TaskQueueID, - *filter.MinTaskID, + *filter.ExclusiveMaxTaskID, *filter.PageSize, ) } @@ -142,7 +142,7 @@ func (mdb *db) DeleteFromTasks( ctx context.Context, filter sqlplugin.TasksFilter, ) (sql.Result, error) { - if filter.TaskIDLessThanEquals != nil { + if filter.ExclusiveMaxTaskID != nil { if filter.Limit == nil || *filter.Limit == 0 { return nil, fmt.Errorf("missing limit parameter") } @@ -152,7 +152,7 @@ func (mdb *db) DeleteFromTasks( filter.TaskQueueID, filter.RangeHash, filter.TaskQueueID, - *filter.TaskIDLessThanEquals, + *filter.ExclusiveMaxTaskID, *filter.Limit, ) } diff --git a/common/persistence/sql/sqlplugin/tests/matching_task_test.go b/common/persistence/sql/sqlplugin/tests/matching_task_test.go index 0a076ee0db0..ba4f5de5adc 100644 --- a/common/persistence/sql/sqlplugin/tests/matching_task_test.go +++ b/common/persistence/sql/sqlplugin/tests/matching_task_test.go @@ -153,15 +153,15 @@ func (s *matchingTaskSuite) TestInsertSelect_Single() { s.NoError(err) s.Equal(1, int(rowsAffected)) - minTaskID := convert.Int64Ptr(taskID - 1) - maxTaskID := convert.Int64Ptr(taskID) + inclusiveMinTaskID := convert.Int64Ptr(taskID) + exclusiveMaxTaskID := convert.Int64Ptr(taskID + 1) pageSize := convert.IntPtr(1) filter := sqlplugin.TasksFilter{ - RangeHash: testMatchingTaskRangeHash, - TaskQueueID: queueID, - MinTaskID: minTaskID, - MaxTaskID: maxTaskID, - PageSize: pageSize, + RangeHash: testMatchingTaskRangeHash, + TaskQueueID: queueID, + InclusiveMinTaskID: inclusiveMinTaskID, + ExclusiveMaxTaskID: exclusiveMaxTaskID, + PageSize: pageSize, } rows, err := s.store.SelectFromTasks(newExecutionContext(), filter) s.NoError(err) @@ -186,15 +186,15 @@ func (s *matchingTaskSuite) TestInsertSelect_Multiple() { s.NoError(err) s.Equal(2, int(rowsAffected)) - minTaskID := convert.Int64Ptr(taskID - 2) - maxTaskID := convert.Int64Ptr(taskID) + inclusiveMinTaskID := convert.Int64Ptr(taskID - 1) + exclusiveMaxTaskID := convert.Int64Ptr(taskID + 1) pageSize := convert.IntPtr(2) filter := sqlplugin.TasksFilter{ - RangeHash: testMatchingTaskRangeHash, - TaskQueueID: queueID, - MinTaskID: minTaskID, - MaxTaskID: maxTaskID, - PageSize: pageSize, + RangeHash: testMatchingTaskRangeHash, + TaskQueueID: queueID, + InclusiveMinTaskID: inclusiveMinTaskID, + ExclusiveMaxTaskID: exclusiveMaxTaskID, + PageSize: pageSize, } rows, err := s.store.SelectFromTasks(newExecutionContext(), filter) s.NoError(err) @@ -244,15 +244,15 @@ func (s *matchingTaskSuite) TestInsertDeleteSelect_Single() { s.NoError(err) s.Equal(1, int(rowsAffected)) - minTaskID := convert.Int64Ptr(taskID - 1) - maxTaskID := convert.Int64Ptr(taskID) + inclusiveMinTaskID := convert.Int64Ptr(taskID) + exclusiveMaxTaskID := convert.Int64Ptr(taskID + 1) pageSize := convert.IntPtr(1) filter = sqlplugin.TasksFilter{ - RangeHash: testMatchingTaskRangeHash, - TaskQueueID: queueID, - MinTaskID: minTaskID, - MaxTaskID: maxTaskID, - PageSize: pageSize, + RangeHash: testMatchingTaskRangeHash, + TaskQueueID: queueID, + InclusiveMinTaskID: inclusiveMinTaskID, + ExclusiveMaxTaskID: exclusiveMaxTaskID, + PageSize: pageSize, } rows, err := s.store.SelectFromTasks(newExecutionContext(), filter) s.NoError(err) @@ -273,10 +273,10 @@ func (s *matchingTaskSuite) TestInsertDeleteSelect_Multiple() { s.Equal(2, int(rowsAffected)) filter := sqlplugin.TasksFilter{ - RangeHash: testMatchingTaskRangeHash, - TaskQueueID: queueID, - TaskIDLessThanEquals: convert.Int64Ptr(taskID), - Limit: convert.IntPtr(2), + RangeHash: testMatchingTaskRangeHash, + TaskQueueID: queueID, + ExclusiveMaxTaskID: convert.Int64Ptr(taskID + 1), + Limit: convert.IntPtr(2), } result, err = s.store.DeleteFromTasks(newExecutionContext(), filter) s.NoError(err) @@ -284,15 +284,15 @@ func (s *matchingTaskSuite) TestInsertDeleteSelect_Multiple() { s.NoError(err) s.Equal(2, int(rowsAffected)) - minTaskID := convert.Int64Ptr(taskID - 2) - maxTaskID := convert.Int64Ptr(taskID) + inclusiveMinTaskID := convert.Int64Ptr(taskID - 1) + exclusiveMaxTaskID := convert.Int64Ptr(taskID + 1) pageSize := convert.IntPtr(2) filter = sqlplugin.TasksFilter{ - RangeHash: testMatchingTaskRangeHash, - TaskQueueID: queueID, - MinTaskID: minTaskID, - MaxTaskID: maxTaskID, - PageSize: pageSize, + RangeHash: testMatchingTaskRangeHash, + TaskQueueID: queueID, + InclusiveMinTaskID: inclusiveMinTaskID, + ExclusiveMaxTaskID: exclusiveMaxTaskID, + PageSize: pageSize, } rows, err := s.store.SelectFromTasks(newExecutionContext(), filter) s.NoError(err) diff --git a/common/persistence/sql/task.go b/common/persistence/sql/task.go index db199148c2d..024d1b1e5c1 100644 --- a/common/persistence/sql/task.go +++ b/common/persistence/sql/task.go @@ -389,23 +389,23 @@ func (m *sqlTaskManager) GetTasks( return nil, serviceerror.NewUnavailable(err.Error()) } - minTaskID := request.MinTaskIDExclusive - maxTaskID := request.MaxTaskIDInclusive + inclusiveMinTaskID := request.InclusiveMinTaskID + exclusiveMaxTaskID := request.ExclusiveMaxTaskID if len(request.NextPageToken) != 0 { token, err := deserializeMatchingTaskPageToken(request.NextPageToken) if err != nil { return nil, err } - minTaskID = token.TaskID + inclusiveMinTaskID = token.TaskID } tqId, tqHash := m.taskQueueIdAndHash(nidBytes, request.TaskQueue, request.TaskType) rows, err := m.Db.SelectFromTasks(ctx, sqlplugin.TasksFilter{ - RangeHash: tqHash, - TaskQueueID: tqId, - MinTaskID: &minTaskID, - MaxTaskID: &maxTaskID, - PageSize: &request.PageSize, + RangeHash: tqHash, + TaskQueueID: tqId, + InclusiveMinTaskID: &inclusiveMinTaskID, + ExclusiveMaxTaskID: &exclusiveMaxTaskID, + PageSize: &request.PageSize, }) if err != nil { return nil, serviceerror.NewUnavailable(fmt.Sprintf("GetTasks operation failed. Failed to get rows. Error: %v", err)) @@ -418,15 +418,16 @@ func (m *sqlTaskManager) GetTasks( response.Tasks[i] = persistence.NewDataBlob(v.Data, v.DataEncoding) } if len(rows) == request.PageSize { - token, err := serializeMatchingTaskPageToken(&matchingTaskPageToken{ - TaskID: rows[len(rows)-1].TaskID, - }) - if err != nil { - return nil, err + nextTaskID := rows[len(rows)-1].TaskID + 1 + if nextTaskID < exclusiveMaxTaskID { + token, err := serializeMatchingTaskPageToken(&matchingTaskPageToken{ + TaskID: nextTaskID, + }) + if err != nil { + return nil, err + } + response.NextPageToken = token } - response.NextPageToken = token - } else { - response.NextPageToken = nil } return response, nil @@ -465,10 +466,10 @@ func (m *sqlTaskManager) CompleteTasksLessThan( } tqId, tqHash := m.taskQueueIdAndHash(nidBytes, request.TaskQueueName, request.TaskType) result, err := m.Db.DeleteFromTasks(ctx, sqlplugin.TasksFilter{ - RangeHash: tqHash, - TaskQueueID: tqId, - TaskIDLessThanEquals: &request.TaskID, - Limit: &request.Limit, + RangeHash: tqHash, + TaskQueueID: tqId, + ExclusiveMaxTaskID: &request.ExclusiveMaxTaskID, + Limit: &request.Limit, }) if err != nil { return 0, serviceerror.NewUnavailable(err.Error()) diff --git a/common/persistence/task_manager.go b/common/persistence/task_manager.go index 6ada1a52572..e514fbfe12d 100644 --- a/common/persistence/task_manager.go +++ b/common/persistence/task_manager.go @@ -194,7 +194,7 @@ func (m *taskManagerImpl) CreateTasks(request *CreateTasksRequest) (*CreateTasks } func (m *taskManagerImpl) GetTasks(request *GetTasksRequest) (*GetTasksResponse, error) { - if request.MinTaskIDExclusive >= request.MaxTaskIDInclusive { + if request.InclusiveMinTaskID >= request.ExclusiveMaxTaskID { return &GetTasksResponse{}, nil } diff --git a/common/persistence/tests/task_queue_task.go b/common/persistence/tests/task_queue_task.go index 339b714af9b..0794815631e 100644 --- a/common/persistence/tests/task_queue_task.go +++ b/common/persistence/tests/task_queue_task.go @@ -115,8 +115,8 @@ func (s *TaskQueueTaskSuite) TestCreateGet_Conflict() { NamespaceID: s.namespaceID, TaskQueue: s.taskQueueName, TaskType: s.taskQueueType, - MinTaskIDExclusive: taskID - 1, - MaxTaskIDInclusive: taskID, + InclusiveMinTaskID: taskID, + ExclusiveMaxTaskID: taskID + 1, PageSize: 100, NextPageToken: nil, }) @@ -144,8 +144,8 @@ func (s *TaskQueueTaskSuite) TestCreateGet_One() { NamespaceID: s.namespaceID, TaskQueue: s.taskQueueName, TaskType: s.taskQueueType, - MinTaskIDExclusive: taskID - 1, - MaxTaskIDInclusive: taskID, + InclusiveMinTaskID: taskID, + ExclusiveMaxTaskID: taskID + 1, PageSize: 100, NextPageToken: nil, }) @@ -190,8 +190,8 @@ func (s *TaskQueueTaskSuite) TestCreateGet_Multiple() { NamespaceID: s.namespaceID, TaskQueue: s.taskQueueName, TaskType: s.taskQueueType, - MinTaskIDExclusive: minTaskID - 1, - MaxTaskIDInclusive: maxTaskID, + InclusiveMinTaskID: minTaskID, + ExclusiveMaxTaskID: maxTaskID + 1, PageSize: 1, NextPageToken: token, }) @@ -231,8 +231,8 @@ func (s *TaskQueueTaskSuite) TestCreateDelete_One() { NamespaceID: s.namespaceID, TaskQueue: s.taskQueueName, TaskType: s.taskQueueType, - MinTaskIDExclusive: taskID - 1, - MaxTaskIDInclusive: taskID, + InclusiveMinTaskID: taskID, + ExclusiveMaxTaskID: taskID + 1, PageSize: 100, NextPageToken: nil, }) @@ -251,14 +251,12 @@ func (s *TaskQueueTaskSuite) TestCreateDelete_Multiple() { rangeID := rand.Int63() taskQueue := s.createTaskQueue(rangeID) - var expectedTasks []*persistencespb.AllocatedTaskInfo for i := 0; i < numCreateBatch; i++ { var tasks []*persistencespb.AllocatedTaskInfo for j := 0; j < createBatchSize; j++ { taskID := minTaskID + int64(i*numCreateBatch+j) task := s.randomTask(taskID) tasks = append(tasks, task) - expectedTasks = append(expectedTasks, task) } _, err := s.taskManager.CreateTasks(&p.CreateTasksRequest{ TaskQueueInfo: &p.PersistedTaskQueueInfo{ @@ -271,11 +269,11 @@ func (s *TaskQueueTaskSuite) TestCreateDelete_Multiple() { } _, err := s.taskManager.CompleteTasksLessThan(&p.CompleteTasksLessThanRequest{ - NamespaceID: s.namespaceID, - TaskQueueName: s.taskQueueName, - TaskType: s.taskQueueType, - TaskID: maxTaskID, - Limit: int(numTasks), + NamespaceID: s.namespaceID, + TaskQueueName: s.taskQueueName, + TaskType: s.taskQueueType, + ExclusiveMaxTaskID: maxTaskID + 1, + Limit: int(numTasks), }) s.NoError(err) @@ -283,8 +281,8 @@ func (s *TaskQueueTaskSuite) TestCreateDelete_Multiple() { NamespaceID: s.namespaceID, TaskQueue: s.taskQueueName, TaskType: s.taskQueueType, - MinTaskIDExclusive: minTaskID - 1, - MaxTaskIDInclusive: maxTaskID, + InclusiveMinTaskID: minTaskID, + ExclusiveMaxTaskID: maxTaskID + 1, PageSize: 100, NextPageToken: nil, }) diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index cbf4179ff7d..7ecef979e76 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -1431,7 +1431,6 @@ func (adh *AdminHandler) ResendReplicationTasks( } // GetTaskQueueTasks returns tasks from task queue -// TODO: support pagination func (adh *AdminHandler) GetTaskQueueTasks( ctx context.Context, request *adminservice.GetTaskQueueTasksRequest, @@ -1453,8 +1452,8 @@ func (adh *AdminHandler) GetTaskQueueTasks( NamespaceID: namespaceID.String(), TaskQueue: request.GetTaskQueue(), TaskType: request.GetTaskQueueType(), - MinTaskIDExclusive: request.GetMinTaskId(), - MaxTaskIDInclusive: request.GetMaxTaskId(), + InclusiveMinTaskID: request.GetMinTaskId(), + ExclusiveMaxTaskID: request.GetMaxTaskId(), PageSize: int(request.GetBatchSize()), NextPageToken: request.NextPageToken, }) diff --git a/service/matching/db.go b/service/matching/db.go index e275ab4eaaf..3d6dd0e469f 100644 --- a/service/matching/db.go +++ b/service/matching/db.go @@ -222,14 +222,18 @@ func (db *taskQueueDB) CreateTasks(tasks []*persistencespb.AllocatedTaskInfo) (* } // GetTasks returns a batch of tasks between the given range -func (db *taskQueueDB) GetTasks(minTaskID int64, maxTaskID int64, batchSize int) (*persistence.GetTasksResponse, error) { +func (db *taskQueueDB) GetTasks( + inclusiveMinTaskID int64, + exclusiveMaxTaskID int64, + batchSize int, +) (*persistence.GetTasksResponse, error) { return db.store.GetTasks(&persistence.GetTasksRequest{ NamespaceID: db.namespaceID.String(), TaskQueue: db.taskQueueName, TaskType: db.taskType, PageSize: batchSize, - MinTaskIDExclusive: minTaskID, // exclusive - MaxTaskIDInclusive: maxTaskID, // inclusive + InclusiveMinTaskID: inclusiveMinTaskID, + ExclusiveMaxTaskID: exclusiveMaxTaskID, }) } @@ -257,19 +261,19 @@ func (db *taskQueueDB) CompleteTask(taskID int64) error { // CompleteTasksLessThan deletes of tasks less than the given taskID. Limit is // the upper bound of number of tasks that can be deleted by this method. It may // or may not be honored -func (db *taskQueueDB) CompleteTasksLessThan(taskID int64, limit int) (int, error) { +func (db *taskQueueDB) CompleteTasksLessThan(exclusiveMaxTaskID int64, limit int) (int, error) { n, err := db.store.CompleteTasksLessThan(&persistence.CompleteTasksLessThanRequest{ - NamespaceID: db.namespaceID.String(), - TaskQueueName: db.taskQueueName, - TaskType: db.taskType, - TaskID: taskID, - Limit: limit, + NamespaceID: db.namespaceID.String(), + TaskQueueName: db.taskQueueName, + TaskType: db.taskType, + ExclusiveMaxTaskID: exclusiveMaxTaskID, + Limit: limit, }) if err != nil { db.logger.Error("Persistent store operation failure", tag.StoreOperationCompleteTasksLessThan, tag.Error(err), - tag.TaskID(taskID), + tag.TaskID(exclusiveMaxTaskID), tag.WorkflowTaskQueueType(db.taskType), tag.WorkflowTaskQueueName(db.taskQueueName)) } diff --git a/service/matching/db_task_manager.go b/service/matching/db_task_manager.go index 0bd322f3cb3..b32a6b9ca79 100644 --- a/service/matching/db_task_manager.go +++ b/service/matching/db_task_manager.go @@ -332,11 +332,11 @@ func (d *dbTaskManager) deleteAckedTasks() { return } _, err := d.store.CompleteTasksLessThan(&persistence.CompleteTasksLessThanRequest{ - NamespaceID: d.taskQueueKey.NamespaceID, - TaskQueueName: d.taskQueueKey.TaskQueueName, - TaskType: d.taskQueueKey.TaskQueueType, - TaskID: ackedTaskID, - Limit: 100000, // TODO @wxing1292 why delete with limit? history service is not doing similar thing + NamespaceID: d.taskQueueKey.NamespaceID, + TaskQueueName: d.taskQueueKey.TaskQueueName, + TaskType: d.taskQueueKey.TaskQueueType, + ExclusiveMaxTaskID: ackedTaskID + 1, + Limit: 100000, // TODO @wxing1292 why delete with limit? history service is not doing similar thing }) if err != nil { d.logger.Error("dbTaskManager encountered task deletion error", tag.Error(err)) diff --git a/service/matching/db_task_manager_test.go b/service/matching/db_task_manager_test.go index 476a7947914..69168326a6d 100644 --- a/service/matching/db_task_manager_test.go +++ b/service/matching/db_task_manager_test.go @@ -354,11 +354,11 @@ func (s *dbTaskManagerSuite) TestDeleteAckedTasks_Success() { s.dbTaskManager.maxDeletedTaskIDInclusive = maxDeletedTaskIDInclusive s.store.EXPECT().CompleteTasksLessThan(&persistence.CompleteTasksLessThanRequest{ - NamespaceID: s.namespaceID, - TaskQueueName: s.taskQueueName, - TaskType: s.taskQueueType, - TaskID: s.ackedTaskID, - Limit: 100000, + NamespaceID: s.namespaceID, + TaskQueueName: s.taskQueueName, + TaskType: s.taskQueueType, + ExclusiveMaxTaskID: s.ackedTaskID + 1, + Limit: 100000, }).Return(0, nil) s.dbTaskManager.deleteAckedTasks() @@ -374,11 +374,11 @@ func (s *dbTaskManagerSuite) TestDeleteAckedTasks_Failed() { s.dbTaskManager.maxDeletedTaskIDInclusive = maxDeletedTaskIDInclusive s.store.EXPECT().CompleteTasksLessThan(&persistence.CompleteTasksLessThanRequest{ - NamespaceID: s.namespaceID, - TaskQueueName: s.taskQueueName, - TaskType: s.taskQueueType, - TaskID: s.ackedTaskID, - Limit: 100000, + NamespaceID: s.namespaceID, + TaskQueueName: s.taskQueueName, + TaskType: s.taskQueueType, + ExclusiveMaxTaskID: s.ackedTaskID + 1, + Limit: 100000, }).Return(0, serviceerror.NewUnavailable("random error")) s.dbTaskManager.deleteAckedTasks() diff --git a/service/matching/db_task_reader.go b/service/matching/db_task_reader.go index 085eb184b09..64086fdc77f 100644 --- a/service/matching/db_task_reader.go +++ b/service/matching/db_task_reader.go @@ -131,8 +131,8 @@ func (t *dbTaskReaderImpl) getPaginationFn( NamespaceID: t.taskQueueKey.NamespaceID, TaskQueue: t.taskQueueKey.TaskQueueName, TaskType: t.taskQueueKey.TaskQueueType, - MinTaskIDExclusive: minTaskID, // exclusive - MaxTaskIDInclusive: maxTaskID, // inclusive + InclusiveMinTaskID: minTaskID + 1, + ExclusiveMaxTaskID: maxTaskID + 1, PageSize: dbTaskReaderPageSize, NextPageToken: paginationToken, }) diff --git a/service/matching/db_task_reader_test.go b/service/matching/db_task_reader_test.go index 1460967146f..63613e2e4ec 100644 --- a/service/matching/db_task_reader_test.go +++ b/service/matching/db_task_reader_test.go @@ -107,8 +107,8 @@ func (s *dbTaskReaderSuite) TestIteration_Error() { TaskQueue: s.taskQueueName, TaskType: s.taskQueueType, PageSize: dbTaskReaderPageSize, - MinTaskIDExclusive: s.ackedTaskID, - MaxTaskIDInclusive: s.maxTaskID, + InclusiveMinTaskID: s.ackedTaskID + 1, + ExclusiveMaxTaskID: s.maxTaskID + 1, NextPageToken: nil, }).Return(nil, serviceerror.NewInternal("random error")) @@ -139,8 +139,8 @@ func (s *dbTaskReaderSuite) TestIteration_ErrorRetry() { TaskQueue: s.taskQueueName, TaskType: s.taskQueueType, PageSize: dbTaskReaderPageSize, - MinTaskIDExclusive: s.ackedTaskID, - MaxTaskIDInclusive: s.maxTaskID, + InclusiveMinTaskID: s.ackedTaskID + 1, + ExclusiveMaxTaskID: s.maxTaskID + 1, NextPageToken: nil, }).Return(&persistence.GetTasksResponse{ Tasks: tasks1, @@ -151,8 +151,8 @@ func (s *dbTaskReaderSuite) TestIteration_ErrorRetry() { TaskQueue: s.taskQueueName, TaskType: s.taskQueueType, PageSize: dbTaskReaderPageSize, - MinTaskIDExclusive: s.ackedTaskID, - MaxTaskIDInclusive: s.maxTaskID, + InclusiveMinTaskID: s.ackedTaskID + 1, + ExclusiveMaxTaskID: s.maxTaskID + 1, NextPageToken: token, }).Return(nil, serviceerror.NewInternal("some random error")), s.taskStore.EXPECT().GetTasks(&persistence.GetTasksRequest{ @@ -160,8 +160,8 @@ func (s *dbTaskReaderSuite) TestIteration_ErrorRetry() { TaskQueue: s.taskQueueName, TaskType: s.taskQueueType, PageSize: dbTaskReaderPageSize, - MinTaskIDExclusive: taskID1, - MaxTaskIDInclusive: s.maxTaskID, + InclusiveMinTaskID: taskID1 + 1, + ExclusiveMaxTaskID: s.maxTaskID + 1, NextPageToken: nil, }).Return(&persistence.GetTasksResponse{ Tasks: tasks2, @@ -223,8 +223,8 @@ func (s *dbTaskReaderSuite) TestIteration_TwoIter() { TaskQueue: s.taskQueueName, TaskType: s.taskQueueType, PageSize: dbTaskReaderPageSize, - MinTaskIDExclusive: s.ackedTaskID, - MaxTaskIDInclusive: maxTaskID1, + InclusiveMinTaskID: s.ackedTaskID + 1, + ExclusiveMaxTaskID: maxTaskID1 + 1, NextPageToken: nil, }).Return(&persistence.GetTasksResponse{ Tasks: tasks1, @@ -253,8 +253,8 @@ func (s *dbTaskReaderSuite) TestIteration_TwoIter() { TaskQueue: s.taskQueueName, TaskType: s.taskQueueType, PageSize: dbTaskReaderPageSize, - MinTaskIDExclusive: s.taskTracker.loadedTaskID, - MaxTaskIDInclusive: maxTaskID2, + InclusiveMinTaskID: s.taskTracker.loadedTaskID + 1, + ExclusiveMaxTaskID: maxTaskID2 + 1, NextPageToken: nil, }).Return(&persistence.GetTasksResponse{ Tasks: tasks2, @@ -303,8 +303,8 @@ func (s *dbTaskReaderSuite) TestIteration_Pagination() { TaskQueue: s.taskQueueName, TaskType: s.taskQueueType, PageSize: dbTaskReaderPageSize, - MinTaskIDExclusive: s.ackedTaskID, - MaxTaskIDInclusive: s.maxTaskID, + InclusiveMinTaskID: s.ackedTaskID + 1, + ExclusiveMaxTaskID: s.maxTaskID + 1, NextPageToken: nil, }).Return(&persistence.GetTasksResponse{ Tasks: tasks1, @@ -315,8 +315,8 @@ func (s *dbTaskReaderSuite) TestIteration_Pagination() { TaskQueue: s.taskQueueName, TaskType: s.taskQueueType, PageSize: dbTaskReaderPageSize, - MinTaskIDExclusive: s.ackedTaskID, - MaxTaskIDInclusive: s.maxTaskID, + InclusiveMinTaskID: s.ackedTaskID + 1, + ExclusiveMaxTaskID: s.maxTaskID + 1, NextPageToken: token, }).Return(&persistence.GetTasksResponse{ Tasks: tasks2, @@ -359,8 +359,8 @@ func (s *dbTaskReaderSuite) TestIteration_MaxTaskID_Exists() { TaskQueue: s.taskQueueName, TaskType: s.taskQueueType, PageSize: dbTaskReaderPageSize, - MinTaskIDExclusive: s.ackedTaskID, - MaxTaskIDInclusive: s.maxTaskID, + InclusiveMinTaskID: s.ackedTaskID + 1, + ExclusiveMaxTaskID: s.maxTaskID + 1, NextPageToken: nil, }).Return(&persistence.GetTasksResponse{ Tasks: tasks, @@ -398,8 +398,8 @@ func (s *dbTaskReaderSuite) TestIteration_MaxTaskID_Missing() { TaskQueue: s.taskQueueName, TaskType: s.taskQueueType, PageSize: dbTaskReaderPageSize, - MinTaskIDExclusive: s.ackedTaskID, - MaxTaskIDInclusive: s.maxTaskID, + InclusiveMinTaskID: s.ackedTaskID + 1, + ExclusiveMaxTaskID: s.maxTaskID + 1, NextPageToken: nil, }).Return(&persistence.GetTasksResponse{ Tasks: tasks, diff --git a/service/matching/matchingEngine_test.go b/service/matching/matchingEngine_test.go index 780f64c9a10..29c38e0cede 100644 --- a/service/matching/matchingEngine_test.go +++ b/service/matching/matchingEngine_test.go @@ -2031,7 +2031,7 @@ func (m *testTaskManager) CompleteTasksLessThan(request *persistence.CompleteTas keys := tlm.tasks.Keys() for _, key := range keys { id := key.(int64) - if id <= request.TaskID { + if id < request.ExclusiveMaxTaskID { tlm.tasks.Remove(id) } } @@ -2097,7 +2097,7 @@ func (m *testTaskManager) CreateTasks(request *persistence.CreateTasksRequest) ( // GetTasks provides a mock function with given fields: request func (m *testTaskManager) GetTasks(request *persistence.GetTasksRequest) (*persistence.GetTasksResponse, error) { - m.logger.Debug("testTaskManager.GetTasks", tag.ReadLevel(request.MinTaskIDExclusive), tag.ReadLevel(request.MaxTaskIDInclusive)) + m.logger.Debug("testTaskManager.GetTasks", tag.MinLevel(request.InclusiveMinTaskID), tag.MaxLevel(request.ExclusiveMaxTaskID)) tlm := m.getTaskQueueManager(newTestTaskQueueID(namespace.ID(request.NamespaceID), request.TaskQueue, request.TaskType)) tlm.Lock() @@ -2107,10 +2107,10 @@ func (m *testTaskManager) GetTasks(request *persistence.GetTasksRequest) (*persi it := tlm.tasks.Iterator() for it.Next() { taskID := it.Key().(int64) - if taskID <= request.MinTaskIDExclusive { + if taskID < request.InclusiveMinTaskID { continue } - if taskID > request.MaxTaskIDInclusive { + if taskID >= request.ExclusiveMaxTaskID { break } tasks = append(tasks, it.Value().(*persistencespb.AllocatedTaskInfo)) diff --git a/service/matching/taskGC.go b/service/matching/taskGC.go index 0d1ab8068e4..2702cd510ef 100644 --- a/service/matching/taskGC.go +++ b/service/matching/taskGC.go @@ -76,7 +76,7 @@ func (tgc *taskGC) tryDeleteNextBatch(ackLevel int64, ignoreTimeCond bool) { return } tgc.lastDeleteTime = time.Now().UTC() - n, err := tgc.db.CompleteTasksLessThan(ackLevel, batchSize) + n, err := tgc.db.CompleteTasksLessThan(ackLevel+1, batchSize) switch { case err != nil: return diff --git a/service/matching/taskReader.go b/service/matching/taskReader.go index 3c0b80a65cd..c389ad1d97c 100644 --- a/service/matching/taskReader.go +++ b/service/matching/taskReader.go @@ -199,7 +199,7 @@ func (tr *taskReader) getTaskBatchWithRange(readLevel int64, maxReadLevel int64) var response *persistence.GetTasksResponse var err error err = executeWithRetry(func() error { - response, err = tr.tlMgr.db.GetTasks(readLevel, maxReadLevel, tr.tlMgr.config.GetTasksBatchSize()) + response, err = tr.tlMgr.db.GetTasks(readLevel+1, maxReadLevel+1, tr.tlMgr.config.GetTasksBatchSize()) return err }) if err != nil { diff --git a/service/worker/scanner/taskqueue/db.go b/service/worker/scanner/taskqueue/db.go index 7c8cb5acc3e..a03f456dab3 100644 --- a/service/worker/scanner/taskqueue/db.go +++ b/service/worker/scanner/taskqueue/db.go @@ -36,16 +36,16 @@ import ( var retryForeverPolicy = newRetryForeverPolicy() -func (s *Scavenger) completeTasks(key *p.TaskQueueKey, taskID int64, limit int) (int, error) { +func (s *Scavenger) completeTasks(key *p.TaskQueueKey, exclusiveMaxTaskID int64, limit int) (int, error) { var n int var err error err = s.retryForever(func() error { n, err = s.db.CompleteTasksLessThan(&p.CompleteTasksLessThanRequest{ - NamespaceID: key.NamespaceID, - TaskQueueName: key.TaskQueueName, - TaskType: key.TaskQueueType, - TaskID: taskID, - Limit: limit, + NamespaceID: key.NamespaceID, + TaskQueueName: key.TaskQueueName, + TaskType: key.TaskQueueType, + ExclusiveMaxTaskID: exclusiveMaxTaskID, + Limit: limit, }) return err }) @@ -60,8 +60,8 @@ func (s *Scavenger) getTasks(key *p.TaskQueueKey, batchSize int) (*p.GetTasksRes NamespaceID: key.NamespaceID, TaskQueue: key.TaskQueueName, TaskType: key.TaskQueueType, - MinTaskIDExclusive: -1, // get the first N tasks sorted by taskID - MaxTaskIDInclusive: math.MaxInt64, + InclusiveMinTaskID: 0, // get the first N tasks sorted by taskID + ExclusiveMaxTaskID: math.MaxInt64, PageSize: batchSize, }) return err diff --git a/service/worker/scanner/taskqueue/handler.go b/service/worker/scanner/taskqueue/handler.go index 55fb8795011..be14416dd5d 100644 --- a/service/worker/scanner/taskqueue/handler.go +++ b/service/worker/scanner/taskqueue/handler.go @@ -88,8 +88,8 @@ func (s *Scavenger) deleteHandler(key *p.TaskQueueKey, state *taskQueueState) ha } } - taskID := resp.Tasks[nTasks-1].GetTaskId() - if _, err = s.completeTasks(key, taskID, nTasks); err != nil { + lastTaskID := resp.Tasks[nTasks-1].GetTaskId() + if _, err = s.completeTasks(key, lastTaskID+1, nTasks); err != nil { return handlerStatusErr } diff --git a/service/worker/scanner/taskqueue/mocks_test.go b/service/worker/scanner/taskqueue/mocks_test.go index ed7761529ce..977bc2ae243 100644 --- a/service/worker/scanner/taskqueue/mocks_test.go +++ b/service/worker/scanner/taskqueue/mocks_test.go @@ -146,7 +146,7 @@ func (tbl *mockTaskTable) get(count int) []*persistencespb.AllocatedTaskInfo { func (tbl *mockTaskTable) deleteLessThan(id int64, limit int) int { count := 0 for _, t := range tbl.tasks { - if t.GetTaskId() <= id && count < limit { + if t.GetTaskId() < id && count < limit { count++ continue } diff --git a/service/worker/scanner/taskqueue/scavenger_test.go b/service/worker/scanner/taskqueue/scavenger_test.go index ff652a582ee..c4c1105cbc2 100644 --- a/service/worker/scanner/taskqueue/scavenger_test.go +++ b/service/worker/scanner/taskqueue/scavenger_test.go @@ -201,7 +201,7 @@ func (s *ScavengerTestSuite) setupTaskMgrMocks() { }).AnyTimes() s.taskMgr.EXPECT().CompleteTasksLessThan(gomock.Any()).DoAndReturn( func(req *p.CompleteTasksLessThanRequest) (int, error) { - return s.taskTables[req.TaskQueueName].deleteLessThan(req.TaskID, req.Limit), nil + return s.taskTables[req.TaskQueueName].deleteLessThan(req.ExclusiveMaxTaskID, req.Limit), nil }).AnyTimes() } diff --git a/tools/cli/admin.go b/tools/cli/admin.go index 30e5380dd82..44fd9379696 100644 --- a/tools/cli/admin.go +++ b/tools/cli/admin.go @@ -401,12 +401,12 @@ func newAdminTaskQueueCommands() []cli.Command { }, cli.Int64Flag{ Name: FlagMinTaskID, - Usage: "Minimum task Id", + Usage: "Inclusive minimum task Id", Value: -12346, // include default task id }, cli.Int64Flag{ Name: FlagMaxTaskID, - Usage: "Maximum task Id", + Usage: "Exclusive maximum task Id", }, cli.BoolFlag{ Name: FlagPrintJSONWithAlias,