Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: correct the operator in the webhook payload #18906

Merged
merged 1 commit into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions make/migrations/postgresql/0120_2.9.0_schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,12 @@ $$
END LOOP;
END
$$;

/* Refactor the structure of replication schedule callback_func_param, convert the raw id to json object for extending */
/* callback_func_param
Old: 100
New: {"policy_id": 100}
*/
UPDATE schedule SET callback_func_param = json_build_object('policy_id', callback_func_param::int)::text
WHERE vendor_type='REPLICATION'
AND callback_func_param NOT LIKE '%policy_id%';
5 changes: 5 additions & 0 deletions src/controller/event/handler/internal/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/goharbor/harbor/src/controller/artifact"
"github.com/goharbor/harbor/src/controller/event"
"github.com/goharbor/harbor/src/controller/event/operator"
"github.com/goharbor/harbor/src/controller/repository"
"github.com/goharbor/harbor/src/controller/tag"
"github.com/goharbor/harbor/src/jobservice/job"
Expand Down Expand Up @@ -246,6 +247,10 @@ func (a *Handler) asyncFlushPullCount(ctx context.Context) {

func (a *Handler) onPush(ctx context.Context, event *event.ArtifactEvent) error {
go func() {
if event.Operator != "" {
ctx = context.WithValue(ctx, operator.ContextKey{}, event.Operator)
}

if err := autoScan(ctx, &artifact.Artifact{Artifact: *event.Artifact}, event.Tags...); err != nil {
log.Errorf("scan artifact %s@%s failed, error: %v", event.Artifact.RepositoryName, event.Artifact.Digest, err)
}
Expand Down
1 change: 1 addition & 0 deletions src/controller/event/handler/replication/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ const (
type Event struct {
Type string
Resource *model.Resource
Operator string
}
5 changes: 5 additions & 0 deletions src/controller/event/handler/replication/event/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"

"github.com/goharbor/harbor/src/controller/event/operator"
"github.com/goharbor/harbor/src/controller/replication"
repctlmodel "github.com/goharbor/harbor/src/controller/replication/model"
"github.com/goharbor/harbor/src/lib/log"
Expand Down Expand Up @@ -51,6 +52,10 @@ func Handle(ctx context.Context, event *Event) error {
return nil
}

if event.Operator != "" {
ctx = context.WithValue(ctx, operator.ContextKey{}, event.Operator)
}

for _, policy := range policies {
id, err := replication.Ctl.Start(ctx, policy, event.Resource, task.ExecutionTriggerEvent)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions src/controller/event/handler/replication/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (r *Handler) handlePushArtifact(ctx context.Context, event *event.PushArtif
}},
},
},
Operator: event.Operator,
}
return repevent.Handle(ctx, e)
}
Expand Down Expand Up @@ -140,6 +141,7 @@ func (r *Handler) handleDeleteArtifact(ctx context.Context, event *event.DeleteA
},
Deleted: true,
},
Operator: event.Operator,
}
return repevent.Handle(ctx, e)
}
Expand Down Expand Up @@ -180,6 +182,7 @@ func (r *Handler) handleCreateTag(ctx context.Context, event *event.CreateTagEve
}},
},
},
Operator: event.Operator,
}
return repevent.Handle(ctx, e)
}
Expand Down Expand Up @@ -212,6 +215,7 @@ func (r *Handler) handleDeleteTag(ctx context.Context, event *event.DeleteTagEve
Deleted: true,
IsDeleteTag: true,
},
Operator: event.Operator,
}
return repevent.Handle(ctx, e)
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func constructReplicationPayload(ctx context.Context, event *event.ReplicationEv
payload := &model.Payload{
Type: event.EventType,
OccurAt: event.OccurAt.Unix(),
Operator: string(execution.Trigger),
Operator: execution.Operator,
EventData: &model.EventData{
Replication: &ctlModel.Replication{
HarborHostname: hostname,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (r *RetentionHandler) constructRetentionPayload(ctx context.Context, event
payload := &model.Payload{
Type: event.EventType,
OccurAt: event.OccurAt.Unix(),
Operator: execution.Trigger,
Operator: execution.Operator,
EventData: &model.EventData{
Retention: &evtModel.Retention{
Total: event.Total,
Expand Down
1 change: 1 addition & 0 deletions src/controller/event/handler/webhook/quota/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func constructQuotaPayload(event *event.QuotaEvent) (*notifyModel.Payload, error
},
Custom: quotaCustom,
},
Operator: event.Operator,
}

if event.Resource != nil {
Expand Down
6 changes: 4 additions & 2 deletions src/controller/event/metadata/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type QuotaMetaData struct {
// used to define the event topic
Level int
// the msg contains the limitation and current usage of quota
Msg string
OccurAt time.Time
Msg string
OccurAt time.Time
Operator string
}

// Resolve quota exceed into common image event
Expand All @@ -54,6 +55,7 @@ func (q *QuotaMetaData) Resolve(evt *event.Event) error {
OccurAt: q.OccurAt,
RepoName: q.RepoName,
Msg: q.Msg,
Operator: q.Operator,
}
if q.Tag != "" || q.Digest != "" {
data.Resource = &event2.ImgResource{
Expand Down
7 changes: 2 additions & 5 deletions src/controller/event/metadata/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ import (
v1 "github.com/goharbor/harbor/src/pkg/scan/rest/v1"
)

const (
autoTriggeredOperator = "auto"
)

// ScanImageMetaData defines meta data of image scanning event
type ScanImageMetaData struct {
Artifact *v1.Artifact
Status string
Operator string
}

// Resolve image scanning metadata into common chart event
Expand All @@ -57,7 +54,7 @@ func (si *ScanImageMetaData) Resolve(evt *event.Event) error {
EventType: eventType,
Artifact: si.Artifact,
OccurAt: time.Now(),
Operator: autoTriggeredOperator,
Operator: si.Operator,
}

evt.Topic = topic
Expand Down
17 changes: 14 additions & 3 deletions src/controller/event/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,23 @@ import (
"github.com/goharbor/harbor/src/common/security"
)

// ContextKey is the key for storing operator in the context.
type ContextKey struct{}

// FromContext return the event operator from context
func FromContext(ctx context.Context) string {
var operator string
sc, ok := security.FromContext(ctx)
if !ok {
return ""
if ok {
operator = sc.GetUsername()
}
// retrieve from context if not found in security context
if operator == "" {
op, ok := ctx.Value(ContextKey{}).(string)
if ok {
operator = op
}
}

return sc.GetUsername()
return operator
}
47 changes: 47 additions & 0 deletions src/controller/event/operator/operator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package operator

import (
"context"
"testing"

"github.com/goharbor/harbor/src/common/security"
testsec "github.com/goharbor/harbor/src/testing/common/security"

"github.com/stretchr/testify/assert"
)

func TestFromContext(t *testing.T) {
{
// no security context and operator context should return ""
op := FromContext(context.Background())
assert.Empty(t, op)
}
{
// return operator from security context
secCtx := &testsec.Context{}
secCtx.On("GetUsername").Return("security-context-user")
ctx := security.NewContext(context.Background(), secCtx)
op := FromContext(ctx)
assert.Equal(t, "security-context-user", op)
}
{
// return operator from operator context
ctx := context.WithValue(context.Background(), ContextKey{}, "operator-context-user")
op := FromContext(ctx)
assert.Equal(t, "operator-context-user", op)
}
}
1 change: 1 addition & 0 deletions src/controller/event/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ type QuotaEvent struct {
OccurAt time.Time
RepoName string
Msg string
Operator string
}

func (q *QuotaEvent) String() string {
Expand Down
15 changes: 13 additions & 2 deletions src/controller/replication/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"time"

"github.com/goharbor/harbor/src/controller/event/operator"
"github.com/goharbor/harbor/src/controller/replication/flow"
replicationmodel "github.com/goharbor/harbor/src/controller/replication/model"
"github.com/goharbor/harbor/src/jobservice/job"
Expand Down Expand Up @@ -104,7 +105,11 @@ func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy,
WithMessage("the policy %d is disabled", policy.ID)
}
// create an execution record
id, err := c.execMgr.Create(ctx, job.ReplicationVendorType, policy.ID, trigger)
extra := make(map[string]interface{})
if op := operator.FromContext(ctx); op != "" {
extra["operator"] = op
}
id, err := c.execMgr.Create(ctx, job.ReplicationVendorType, policy.ID, trigger, extra)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -263,7 +268,7 @@ func (c *controller) GetTaskLog(ctx context.Context, id int64) ([]byte, error) {
}

func convertExecution(exec *task.Execution) *Execution {
return &Execution{
replicationExec := &Execution{
ID: exec.ID,
PolicyID: exec.VendorID,
Status: exec.Status,
Expand All @@ -273,6 +278,12 @@ func convertExecution(exec *task.Execution) *Execution {
StartTime: exec.StartTime,
EndTime: exec.EndTime,
}

if operator, ok := exec.ExtraAttrs["operator"].(string); ok {
replicationExec.Operator = operator
}

return replicationExec
}

func convertTask(task *task.Task) *Task {
Expand Down
4 changes: 2 additions & 2 deletions src/controller/replication/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (r *replicationTestSuite) TestStart() {
r.Require().NotNil(err)

// got error when running the replication flow
r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
r.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{}, nil)
r.execMgr.On("StopAndWait", mock.Anything, mock.Anything, mock.Anything).Return(nil)
r.execMgr.On("MarkError", mock.Anything, mock.Anything, mock.Anything).Return(nil)
Expand All @@ -91,7 +91,7 @@ func (r *replicationTestSuite) TestStart() {
r.SetupTest()

// got no error when running the replication flow
r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
r.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{}, nil)
r.flowCtl.On("Start", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
r.ormCreator.On("Create").Return(nil)
Expand Down
1 change: 1 addition & 0 deletions src/controller/replication/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Execution struct {
StatusMessage string
Metrics *dao.Metrics
Trigger string
Operator string
StartTime time.Time
EndTime time.Time
}
Expand Down
32 changes: 27 additions & 5 deletions src/controller/replication/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package replication

import (
"context"
"strconv"
"encoding/json"

"github.com/goharbor/harbor/src/common/secret"
"github.com/goharbor/harbor/src/controller/event/operator"
"github.com/goharbor/harbor/src/controller/replication/model"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/log"
Expand All @@ -31,10 +33,20 @@ const callbackFuncName = "REPLICATION_CALLBACK"

func init() {
callbackFunc := func(ctx context.Context, param string) error {
policyID, err := strconv.ParseInt(param, 10, 64)
if err != nil {
params := make(map[string]interface{})
if err := json.Unmarshal([]byte(param), &params); err != nil {
return err
}

var policyID int64
if id, ok := params["policy_id"].(float64); ok {
policyID = int64(id)
}

if op, ok := params["operator"].(string); ok {
ctx = context.WithValue(ctx, operator.ContextKey{}, op)
}

policy, err := Ctl.GetPolicy(ctx, policyID)
if err != nil {
return err
Expand Down Expand Up @@ -121,8 +133,13 @@ func (c *controller) CreatePolicy(ctx context.Context, policy *model.Policy) (in
}
// create schedule if needed
if policy.IsScheduledTrigger() {
cbParams := map[string]interface{}{
"policy_id": id,
// the operator of schedule job is harbor-jobservice
"operator": secret.JobserviceUser,
}
if _, err = c.scheduler.Schedule(ctx, job.ReplicationVendorType, id, "", policy.Trigger.Settings.Cron,
callbackFuncName, id, map[string]interface{}{}); err != nil {
callbackFuncName, cbParams, map[string]interface{}{}); err != nil {
return 0, err
}
}
Expand All @@ -148,8 +165,13 @@ func (c *controller) UpdatePolicy(ctx context.Context, policy *model.Policy, pro
}
// create schedule if needed
if policy.IsScheduledTrigger() {
cbParams := map[string]interface{}{
"policy_id": policy.ID,
// the operator of schedule job is harbor-jobservice
"operator": secret.JobserviceUser,
}
if _, err := c.scheduler.Schedule(ctx, job.ReplicationVendorType, policy.ID, "", policy.Trigger.Settings.Cron,
callbackFuncName, policy.ID, map[string]interface{}{}); err != nil {
callbackFuncName, cbParams, map[string]interface{}{}); err != nil {
return err
}
}
Expand Down
Loading