Skip to content

Commit

Permalink
Remove job status track information from redis after stop (#19227)
Browse files Browse the repository at this point in the history
Remove job status track information from redis after stop the job in the queue

  After stop in the queue:
  Remove key in {harbor_job_service_namespace}:job_track:inprogress
  Remove {harbor_job_service_namespace}:job_stats:<job_id>
  fixes #19211

Signed-off-by: stonezdj <[email protected]>
  • Loading branch information
stonezdj authored Sep 5, 2023
1 parent d381185 commit 7f19163
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 0 deletions.
26 changes: 26 additions & 0 deletions src/pkg/jobmonitor/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/gomodule/redigo/redis"

"github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/lib/log"
libRedis "github.com/goharbor/harbor/src/lib/redis"
Expand Down Expand Up @@ -93,6 +94,10 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) (
if err != nil {
return []string{}, err
}
go func() {
// the amount of jobIDs maybe large, so use goroutine to remove the job status tracking info
r.removeJobStatusInRedis(ctx, jobIDs)
}()
if ret < 1 {
// no job in queue removed
return []string{}, fmt.Errorf("no job in the queue removed")
Expand All @@ -102,6 +107,27 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) (
return jobIDs, nil
}

// removeJobStatusInRedis remove job status track information from redis, to avoid performance impact when the jobIDs is too large, use batch to remove
func (r *redisClientImpl) removeJobStatusInRedis(ctx context.Context, jobIDs []string) {
conn := r.redisPool.Get()
defer conn.Close()
for _, id := range jobIDs {
namespace := fmt.Sprintf("{%s}", r.namespace)
redisKeyStatus := rds.KeyJobStats(namespace, id)
log.Debugf("delete job status info for job id:%v, key:%v", id, redisKeyStatus)
_, err := conn.Do("DEL", redisKeyStatus)
if err != nil {
log.Warningf("failed to delete the job status info for job %v, %v, continue", id, err)
}
redisKeyInProgress := rds.KeyJobTrackInProgress(namespace)
log.Debugf("delete inprogress info for key:%v, job id:%v", id, redisKeyInProgress)
_, err = conn.Do("HDEL", redisKeyInProgress, id)
if err != nil {
log.Warningf("failed to delete the job info in %v for job %v, %v, continue", rds.KeyJobTrackInProgress(namespace), id, err)
}
}
}

func (r *redisClientImpl) AllJobTypes(ctx context.Context) ([]string, error) {
conn := r.redisPool.Get()
defer conn.Close()
Expand Down
86 changes: 86 additions & 0 deletions src/pkg/jobmonitor/redis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jobmonitor

import (
"context"
"fmt"
"os"
"testing"

"github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/suite"

"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/config"
)

type RedisClientTestSuite struct {
suite.Suite
redisClient redisClientImpl
redisURL string
}

func (suite *RedisClientTestSuite) SetupSuite() {
redisHost := os.Getenv("REDIS_HOST")
if redisHost == "" {
suite.FailNow("REDIS_HOST is not specified")
}
suite.redisURL = fmt.Sprintf("redis://%s:6379", redisHost)
pool, err := redisPool(&config.RedisPoolConfig{RedisURL: suite.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30})
suite.redisClient = redisClientImpl{
redisPool: pool,
namespace: "{harbor_job_service_namespace}",
}
if err != nil {
suite.FailNow("failed to create redis client", err)
}
}

func (suite *RedisClientTestSuite) TearDownSuite() {
}

func (suite *RedisClientTestSuite) TestUntrackJobStatusInBatch() {
// create key and value
jobIDs := make([]string, 0)
conn := suite.redisClient.redisPool.Get()
defer conn.Close()
for i := 0; i < 100; i++ {
k := utils.GenerateRandomStringWithLen(10)
jobIDs = append(jobIDs, k)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), k)
v := utils.GenerateRandomStringWithLen(10)
_, err := conn.Do("HSET", key, k, v)
if err != nil {
suite.FailNow("can not insert data to redis", err)
}
}
suite.redisClient.removeJobStatusInRedis(context.Background(), jobIDs)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), "*")
result, err := conn.Do("KEYS", key)
if err != nil {
suite.FailNow("can not get data from redis", err)
}
remains, err := redis.Values(result, err)
if err != nil {
suite.FailNow("can not get data from redis", err)
}
suite.Equal(0, len(remains))
}

func TestRedisClientTestSuite(t *testing.T) {
suite.Run(t, &RedisClientTestSuite{})
}

0 comments on commit 7f19163

Please sign in to comment.