Skip to content

Commit

Permalink
chore: clean the codebase
Browse files Browse the repository at this point in the history
  • Loading branch information
chuang8511 committed Dec 4, 2024
1 parent ce575eb commit 7d79011
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 26 deletions.
File renamed without changes.
28 changes: 18 additions & 10 deletions pkg/service/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,14 @@ func (s *service) UpdateNamespacePipelineIDByID(ctx context.Context, ns resource
return s.converter.ConvertPipelineToPB(ctx, dbPipeline, pipelinepb.Pipeline_VIEW_FULL, true, true)
}

// preTriggerPipeline does the following:
// 1. Upload pipeline input data to minio if the data is blob data.
// 2. New workflow memory.
// 2-1. Set the default values for the variables for memory data and uploading pipeline data.
// 2-2. Set the data with data.Value for the memory data, which will be used for pipeline running.
// 2-3. Upload "uploading pipeline data" to minio for pipeline run logger.
// 3. Map the settings in recipe to the format in workflow memory.
// 4. Enable the streaming mode when the header contains "text/event-stream"
func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace, r *datamodel.Recipe, pipelineTriggerID string, pipelineData []*pipelinepb.TriggerData) error {
batchSize := len(pipelineData)
if batchSize > constant.MaxBatchSize {
Expand All @@ -845,6 +853,13 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace,
defaultValueMap[k] = v.Default
}

requesterUID, _ := resourcex.GetRequesterUIDAndUserUID(ctx)

expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, requesterUID.String())
if err != nil {
return fmt.Errorf("get expiry rule tag: %w", err)
}

errors := []string{}

for idx, data := range pipelineData {
Expand Down Expand Up @@ -1362,17 +1377,10 @@ func (s *service) preTriggerPipeline(ctx context.Context, ns resource.Namespace,
}
}

requesterUID, _ := resourcex.GetRequesterUIDAndUserUID(ctx)

expiryRuleTag, err := s.retentionHandler.GetExpiryTagBySubscriptionPlan(ctx, requesterUID.String())
if err != nil {
return fmt.Errorf("get expiry rule tag: %w", err)
}

err = s.uploadPipelineRunInputsToMinio(ctx, uploadPipelineRunInputsToMinioParam{
PipelineTriggerID: pipelineTriggerID,
ExpiryRuleTag: expiryRuleTag,
PipelineData: uploadingPipelineData,
pipelineTriggerID: pipelineTriggerID,
expiryRuleTag: expiryRuleTag,
pipelineData: uploadingPipelineData,
})
if err != nil {
return fmt.Errorf("pipeline run inputs to minio: %w", err)
Expand Down
28 changes: 14 additions & 14 deletions pkg/service/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (s *service) ListPipelineRuns(ctx context.Context, req *pb.ListPipelineRuns
return nil, err
}

isOwner := dbPipeline.OwnerUID().String() == requesterUID.String()
isOwner := dbPipeline.OwnerUID() == requesterUID

pipelineRuns, totalCount, err := s.repository.GetPaginatedPipelineRunsWithPermissions(ctx, requesterUID.String(), dbPipeline.UID.String(),
page, pageSize, filter, orderBy, isOwner)
Expand All @@ -103,7 +103,7 @@ func (s *service) ListPipelineRuns(ctx context.Context, req *pb.ListPipelineRuns

var referenceIDs []string
for _, pipelineRun := range pipelineRuns {
if CanViewPrivateData(pipelineRun.RequesterUID.String(), requesterUID.String()) {
if canViewPrivateData(pipelineRun.RequesterUID, requesterUID) {
for _, input := range pipelineRun.Inputs {
referenceIDs = append(referenceIDs, input.Name)
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func (s *service) ListPipelineRuns(ctx context.Context, req *pb.ListPipelineRuns
pbRun.RequesterId = *requesterID
}

if CanViewPrivateData(run.RequesterUID.String(), requesterUID.String()) {
if canViewPrivateData(run.RequesterUID, requesterUID) {
if len(run.Inputs) == 1 {
key := run.Inputs[0].Name
pbRun.Inputs, err = parseMetadataToStructArray(metadataMap, key)
Expand Down Expand Up @@ -213,7 +213,7 @@ func (s *service) ListComponentRuns(ctx context.Context, req *pb.ListComponentRu
return nil, fmt.Errorf("failed to get pipeline by UID: %s. error: %s", dbPipelineRun.PipelineUID.String(), err.Error())
}

isOwner := dbPipeline.OwnerUID().String() == requesterUID.String()
isOwner := dbPipeline.OwnerUID() == requesterUID

if !isOwner && requesterUID.String() != dbPipelineRun.RequesterUID.String() {
return nil, fmt.Errorf("requester is not pipeline owner/credit owner. they are not allowed to view these component runs")
Expand All @@ -226,7 +226,7 @@ func (s *service) ListComponentRuns(ctx context.Context, req *pb.ListComponentRu

var referenceIDs []string
for _, pipelineRun := range componentRuns {
if CanViewPrivateData(dbPipelineRun.RequesterUID.String(), requesterUID.String()) {
if canViewPrivateData(dbPipelineRun.RequesterUID, requesterUID) {
for _, input := range pipelineRun.Inputs {
referenceIDs = append(referenceIDs, input.Name)
}
Expand Down Expand Up @@ -255,7 +255,7 @@ func (s *service) ListComponentRuns(ctx context.Context, req *pb.ListComponentRu
return nil, fmt.Errorf("failed to convert component run: %w", err)
}

if CanViewPrivateData(dbPipelineRun.RequesterUID.String(), requesterUID.String()) {
if canViewPrivateData(dbPipelineRun.RequesterUID, requesterUID) {
if len(run.Inputs) == 1 {
key := run.Inputs[0].Name
pbRun.Inputs, err = parseMetadataToStructArray(metadataMap, key)
Expand Down Expand Up @@ -384,26 +384,26 @@ func (s *service) ListPipelineRunsByRequester(ctx context.Context, req *pb.ListP
}

type uploadPipelineRunInputsToMinioParam struct {
PipelineTriggerID string
ExpiryRuleTag string
PipelineData []map[string]any
pipelineTriggerID string
expiryRuleTag string
pipelineData []map[string]any
}

func (s *service) uploadPipelineRunInputsToMinio(ctx context.Context, param uploadPipelineRunInputsToMinioParam) error {
logger, _ := logger.GetZapLogger(ctx)
minioClient := s.minioClient
objectName := fmt.Sprintf("pipeline-runs/input/%s.json", param.PipelineTriggerID)
objectName := fmt.Sprintf("pipeline-runs/input/%s.json", param.pipelineTriggerID)

logger.Info("uploadPipelineRunInputsToMinio started",
zap.String("objectName", objectName),
zap.Any("pipelineData", param.PipelineData),
zap.Any("pipelineData", param.pipelineData),
)

url, objectInfo, err := minioClient.UploadFile(ctx, logger, &miniox.UploadFileParam{
FilePath: objectName,
FileContent: param.PipelineData,
FileContent: param.pipelineData,
FileMimeType: constant.ContentTypeJSON,
ExpiryRuleTag: param.ExpiryRuleTag,
ExpiryRuleTag: param.expiryRuleTag,
})
if err != nil {
return fmt.Errorf("upload pipeline run inputs to minio: %w", err)
Expand All @@ -416,7 +416,7 @@ func (s *service) uploadPipelineRunInputsToMinio(ctx context.Context, param uplo
URL: url,
}}

err = s.repository.UpdatePipelineRun(ctx, param.PipelineTriggerID, &datamodel.PipelineRun{Inputs: inputs})
err = s.repository.UpdatePipelineRun(ctx, param.pipelineTriggerID, &datamodel.PipelineRun{Inputs: inputs})
if err != nil {
logger.Error("save pipeline run input data", zap.Error(err))
return err
Expand Down
3 changes: 1 addition & 2 deletions pkg/service/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ func (s *service) convertComponentRunToPB(run datamodel.ComponentRun) (*pipeline
return result, nil
}

// CanViewPrivateData - only with requester ns could users see their input/output data
func CanViewPrivateData(namespace, requesterUID string) bool {
func canViewPrivateData(namespace, requesterUID uuid.UUID) bool {
return namespace == requesterUID
}

Expand Down

0 comments on commit 7d79011

Please sign in to comment.