diff --git a/src/pkg/jobmonitor/redis.go b/src/pkg/jobmonitor/redis.go index 06ec3b84a46..212fa8d1996 100644 --- a/src/pkg/jobmonitor/redis.go +++ b/src/pkg/jobmonitor/redis.go @@ -23,6 +23,7 @@ import ( "github.com/gomodule/redigo/redis" "github.com/goharbor/harbor/src/common/job" + "github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/config" "github.com/goharbor/harbor/src/lib/log" libRedis "github.com/goharbor/harbor/src/lib/redis" @@ -93,6 +94,10 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) ( if err != nil { return []string{}, err } + go func() { + // the amount of jobIDs maybe large, so use goroutine to remove the job status tracking info + r.removeJobStatusInRedis(ctx, jobIDs) + }() if ret < 1 { // no job in queue removed return []string{}, fmt.Errorf("no job in the queue removed") @@ -102,6 +107,27 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) ( return jobIDs, nil } +// removeJobStatusInRedis remove job status track information from redis, to avoid performance impact when the jobIDs is too large, use batch to remove +func (r *redisClientImpl) removeJobStatusInRedis(ctx context.Context, jobIDs []string) { + conn := r.redisPool.Get() + defer conn.Close() + for _, id := range jobIDs { + namespace := fmt.Sprintf("{%s}", r.namespace) + redisKeyStatus := rds.KeyJobStats(namespace, id) + log.Debugf("delete job status info for job id:%v, key:%v", id, redisKeyStatus) + _, err := conn.Do("DEL", redisKeyStatus) + if err != nil { + log.Warningf("failed to delete the job status info for job %v, %v, continue", id, err) + } + redisKeyInProgress := rds.KeyJobTrackInProgress(namespace) + log.Debugf("delete inprogress info for key:%v, job id:%v", id, redisKeyInProgress) + _, err = conn.Do("HDEL", redisKeyInProgress, id) + if err != nil { + log.Warningf("failed to delete the job info in %v for job %v, %v, continue", rds.KeyJobTrackInProgress(namespace), id, err) + } + } +} + func (r *redisClientImpl) AllJobTypes(ctx context.Context) ([]string, error) { conn := r.redisPool.Get() defer conn.Close() diff --git a/src/pkg/jobmonitor/redis_test.go b/src/pkg/jobmonitor/redis_test.go new file mode 100644 index 00000000000..b902f569ae3 --- /dev/null +++ b/src/pkg/jobmonitor/redis_test.go @@ -0,0 +1,86 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmonitor + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/gomodule/redigo/redis" + "github.com/stretchr/testify/suite" + + "github.com/goharbor/harbor/src/common/utils" + "github.com/goharbor/harbor/src/jobservice/common/rds" + "github.com/goharbor/harbor/src/jobservice/config" +) + +type RedisClientTestSuite struct { + suite.Suite + redisClient redisClientImpl + redisURL string +} + +func (suite *RedisClientTestSuite) SetupSuite() { + redisHost := os.Getenv("REDIS_HOST") + if redisHost == "" { + suite.FailNow("REDIS_HOST is not specified") + } + suite.redisURL = fmt.Sprintf("redis://%s:6379", redisHost) + pool, err := redisPool(&config.RedisPoolConfig{RedisURL: suite.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30}) + suite.redisClient = redisClientImpl{ + redisPool: pool, + namespace: "{harbor_job_service_namespace}", + } + if err != nil { + suite.FailNow("failed to create redis client", err) + } +} + +func (suite *RedisClientTestSuite) TearDownSuite() { +} + +func (suite *RedisClientTestSuite) TestUntrackJobStatusInBatch() { + // create key and value + jobIDs := make([]string, 0) + conn := suite.redisClient.redisPool.Get() + defer conn.Close() + for i := 0; i < 100; i++ { + k := utils.GenerateRandomStringWithLen(10) + jobIDs = append(jobIDs, k) + key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), k) + v := utils.GenerateRandomStringWithLen(10) + _, err := conn.Do("HSET", key, k, v) + if err != nil { + suite.FailNow("can not insert data to redis", err) + } + } + suite.redisClient.removeJobStatusInRedis(context.Background(), jobIDs) + key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), "*") + result, err := conn.Do("KEYS", key) + if err != nil { + suite.FailNow("can not get data from redis", err) + } + remains, err := redis.Values(result, err) + if err != nil { + suite.FailNow("can not get data from redis", err) + } + suite.Equal(0, len(remains)) +} + +func TestRedisClientTestSuite(t *testing.T) { + suite.Run(t, &RedisClientTestSuite{}) +}